/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.execution;

import com.hazelcast.jet.Processor;
import com.hazelcast.jet.Watermark;
import com.hazelcast.jet.impl.execution.DoneItem;
import com.hazelcast.jet.impl.execution.InboundEdgeStream;
import com.hazelcast.jet.impl.execution.OutboundCollector;
import com.hazelcast.jet.impl.execution.OutboundEdgeStream;
import com.hazelcast.jet.impl.execution.ProcessorTaskletBase;
import com.hazelcast.jet.impl.execution.init.Contexts;
import com.hazelcast.jet.impl.util.ArrayDequeOutbox;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.util.Preconditions;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import javax.annotation.Nonnull;

public class CooperativeProcessorTasklet
extends ProcessorTaskletBase {
    private final ArrayDequeOutbox outbox;
    private boolean processorCompleted;

    public CooperativeProcessorTasklet(Contexts.ProcCtx context, Processor processor, List<InboundEdgeStream> instreams, List<OutboundEdgeStream> outstreams) {
        super(context, processor, instreams, outstreams);
        Preconditions.checkTrue(processor.isCooperative(), "Processor is non-cooperative");
        int[] bucketCapacities = Stream.of(this.outstreams).mapToInt(OutboundEdgeStream::getOutboxCapacity).toArray();
        this.outbox = new ArrayDequeOutbox(bucketCapacities, this.progTracker);
    }

    @Override
    public final boolean isCooperative() {
        return true;
    }

    @Override
    public void init(CompletableFuture<Void> jobFuture) {
        this.initProcessor(this.outbox, jobFuture);
    }

    @Override
    @Nonnull
    public ProgressState call() {
        this.progTracker.reset();
        if (!this.inbox().isEmpty()) {
            this.progTracker.notDone();
        } else {
            if (!this.processor.tryProcess()) {
                this.tryFlushOutbox();
                this.progTracker.notDone();
                return this.progTracker.toProgressState();
            }
            this.tryFillInbox();
        }
        if (this.progTracker.isDone()) {
            this.completeIfNeeded();
        } else if (!this.inbox().isEmpty()) {
            this.processor.process(this.currInstream.ordinal(), this.inbox());
        }
        this.tryFlushOutbox();
        return this.progTracker.toProgressState();
    }

    private void completeIfNeeded() {
        if (this.processorCompleted) {
            return;
        }
        this.processorCompleted = this.processor.complete();
        if (this.processorCompleted) {
            this.outbox.addIgnoringCapacity(DoneItem.DONE_ITEM);
            return;
        }
        this.progTracker.notDone();
    }

    private void tryFlushOutbox() {
        block0: for (int i = 0; i < this.outbox.bucketCount(); ++i) {
            Object item;
            Queue<Object> q = this.outbox.queueWithOrdinal(i);
            while ((item = q.peek()) != null) {
                OutboundCollector c = this.outstreams[i].getCollector();
                ProgressState state = item instanceof Watermark || item instanceof DoneItem ? c.offerBroadcast(item) : c.offer(item);
                this.progTracker.madeProgress(state.isMadeProgress());
                if (!state.isDone()) {
                    this.progTracker.notDone();
                    continue block0;
                }
                q.remove();
            }
        }
    }
}

