/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.execution;

import com.hazelcast.jet.Inbox;
import com.hazelcast.jet.Processor;
import com.hazelcast.jet.impl.execution.InboundEdgeStream;
import com.hazelcast.jet.impl.execution.OutboundEdgeStream;
import com.hazelcast.jet.impl.execution.Tasklet;
import com.hazelcast.jet.impl.util.ArrayDequeOutbox;
import com.hazelcast.jet.impl.util.CircularListCursor;
import com.hazelcast.jet.impl.util.DoneItem;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.ProgressTracker;
import com.hazelcast.util.Preconditions;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.TreeMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;

public class ProcessorTasklet
implements Tasklet {
    private final ArrayDequeInbox inbox = new ArrayDequeInbox();
    private final ProgressTracker progTracker = new ProgressTracker();
    private final Processor processor;
    private final Queue<ArrayList<InboundEdgeStream>> instreamGroupQueue;
    private final String vertexName;
    private final Processor.Context context;
    private CircularListCursor<InboundEdgeStream> instreamCursor;
    private final ArrayDequeOutbox outbox;
    private final OutboundEdgeStream[] outstreams;
    private InboundEdgeStream currInstream;
    private boolean currInstreamExhausted;
    private boolean processorCompleted;

    public ProcessorTasklet(String vertexName, Processor.Context context, Processor processor, List<InboundEdgeStream> instreams, List<OutboundEdgeStream> outstreams) {
        Preconditions.checkNotNull((Object)processor, (String)"processor");
        this.vertexName = vertexName;
        this.processor = processor;
        this.instreamGroupQueue = instreams.stream().collect(Collectors.groupingBy(InboundEdgeStream::priority, TreeMap::new, Collectors.toCollection(ArrayList::new))).entrySet().stream().map(Map.Entry::getValue).collect(Collectors.toCollection(ArrayDeque::new));
        this.outstreams = (OutboundEdgeStream[])outstreams.stream().sorted(Comparator.comparing(OutboundEdgeStream::ordinal)).toArray(OutboundEdgeStream[]::new);
        int[] highWaterMarks = Stream.of(this.outstreams).mapToInt(OutboundEdgeStream::getHighWaterMark).toArray();
        this.outbox = new ArrayDequeOutbox(outstreams.size(), highWaterMarks);
        this.context = context;
        this.instreamCursor = this.popInstreamGroup();
    }

    @Override
    public void init() {
        this.processor.init(this.outbox, this.context);
    }

    @Override
    public boolean isCooperative() {
        return this.processor.isCooperative();
    }

    @Override
    @Nonnull
    public ProgressState call() {
        this.progTracker.reset();
        this.tryFillInbox();
        if (this.progTracker.isDone()) {
            this.completeIfNeeded();
        } else if (!this.inbox.isEmpty()) {
            this.tryProcessInbox();
        } else if (this.currInstreamExhausted) {
            this.progTracker.madeProgress(true);
            if (this.processor.completeEdge(this.currInstream.ordinal())) {
                this.currInstream = null;
            }
        }
        this.tryFlushOutbox();
        return this.progTracker.toProgressState();
    }

    private CircularListCursor<InboundEdgeStream> popInstreamGroup() {
        return Optional.ofNullable(this.instreamGroupQueue.poll()).map(CircularListCursor::new).orElse(null);
    }

    private void tryFillInbox() {
        ProgressState result;
        if (!this.inbox.isEmpty() || this.currInstream != null && this.currInstreamExhausted) {
            this.progTracker.notDone();
            return;
        }
        if (this.instreamCursor == null) {
            return;
        }
        this.progTracker.notDone();
        InboundEdgeStream first = this.instreamCursor.value();
        do {
            this.currInstream = this.instreamCursor.value();
            result = this.currInstream.drainTo(this.inbox);
            this.progTracker.madeProgress(result.isMadeProgress());
            this.currInstreamExhausted = result.isDone();
            if (this.currInstreamExhausted) {
                this.instreamCursor.remove();
            }
            if (this.instreamCursor.advance()) continue;
            this.instreamCursor = this.popInstreamGroup();
            return;
        } while (!result.isMadeProgress() && this.instreamCursor.value() != first);
    }

    private void tryProcessInbox() {
        if (this.outbox.isHighWater()) {
            this.progTracker.notDone();
            return;
        }
        this.progTracker.madeProgress(true);
        int inboundOrdinal = this.currInstream.ordinal();
        this.processor.process(inboundOrdinal, this.inbox);
        if (!this.inbox.isEmpty()) {
            this.progTracker.notDone();
        }
    }

    private void completeIfNeeded() {
        if (this.processorCompleted) {
            return;
        }
        if (this.outbox.isHighWater()) {
            this.progTracker.notDone();
            return;
        }
        this.progTracker.madeProgress(true);
        if (!this.processor.complete()) {
            this.progTracker.notDone();
            return;
        }
        this.processorCompleted = true;
        for (OutboundEdgeStream outstream : this.outstreams) {
            this.outbox.add(outstream.ordinal(), DoneItem.DONE_ITEM);
        }
    }

    private void tryFlushOutbox() {
        block0: for (int i = 0; i < this.outbox.bucketCount(); ++i) {
            Object item;
            Queue<Object> q = this.outbox.queueWithOrdinal(i);
            while ((item = q.peek()) != null) {
                ProgressState state = item != DoneItem.DONE_ITEM ? this.outstreams[i].getCollector().offer(item) : this.outstreams[i].getCollector().close();
                this.progTracker.madeProgress(state.isMadeProgress());
                if (!state.isDone()) {
                    this.progTracker.notDone();
                    continue block0;
                }
                q.remove();
            }
        }
    }

    public String toString() {
        return "ProcessorTasklet{vertex=" + this.vertexName + ", processor=" + this.processor + '}';
    }

    private static final class ArrayDequeInbox
    extends ArrayDeque<Object>
    implements Inbox {
        private ArrayDequeInbox() {
        }
    }
}

