/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.execution;

import com.hazelcast.jet.JetException;
import com.hazelcast.jet.Outbox;
import com.hazelcast.jet.Processor;
import com.hazelcast.jet.Watermark;
import com.hazelcast.jet.impl.execution.DoneItem;
import com.hazelcast.jet.impl.execution.ExecutionService;
import com.hazelcast.jet.impl.execution.InboundEdgeStream;
import com.hazelcast.jet.impl.execution.OutboundCollector;
import com.hazelcast.jet.impl.execution.OutboundEdgeStream;
import com.hazelcast.jet.impl.execution.ProcessorTaskletBase;
import com.hazelcast.jet.impl.execution.init.Contexts;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.util.Preconditions;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;

public class BlockingProcessorTasklet
extends ProcessorTaskletBase {
    private final BlockingOutbox outbox;
    private CompletableFuture<?> jobFuture;

    public BlockingProcessorTasklet(Contexts.ProcCtx context, Processor processor, List<InboundEdgeStream> instreams, List<OutboundEdgeStream> outstreams) {
        super(context, processor, instreams, outstreams);
        Preconditions.checkFalse(processor.isCooperative(), "Processor is cooperative");
        this.outbox = new BlockingOutbox();
    }

    @Override
    public final boolean isCooperative() {
        return false;
    }

    @Override
    public void init(CompletableFuture<Void> jobFuture) {
        super.init(jobFuture);
        this.jobFuture = jobFuture;
        this.initProcessor(this.outbox, jobFuture);
    }

    @Override
    @Nonnull
    public ProgressState call() {
        try {
            this.progTracker.reset();
            if (this.inbox().isEmpty()) {
                this.callNullaryProcess();
                this.tryFillInbox();
            } else {
                this.progTracker.notDone();
            }
            if (this.progTracker.isDone()) {
                this.complete();
            } else if (!this.inbox().isEmpty()) {
                this.processor.process(this.currInstream.ordinal(), this.inbox());
            }
            return this.progTracker.toProgressState();
        }
        catch (JobFutureCompleted e) {
            return ProgressState.DONE;
        }
    }

    private void callNullaryProcess() {
        if (!this.processor.tryProcess()) {
            throw new JetException("Non-cooperative processor's tryProcess() returned false: " + this.processor);
        }
    }

    private void complete() {
        if (this.processor.complete()) {
            this.outbox.add(DoneItem.DONE_ITEM);
        } else {
            this.progTracker.notDone();
        }
    }

    private static class JobFutureCompleted
    extends RuntimeException {
        private JobFutureCompleted() {
        }
    }

    private class BlockingOutbox
    implements Outbox {
        private BlockingOutbox() {
        }

        @Override
        public int bucketCount() {
            return BlockingProcessorTasklet.this.outstreams.length;
        }

        @Override
        public boolean offer(int ordinal, @Nonnull Object item) {
            BlockingProcessorTasklet.this.progTracker.madeProgress();
            if (ordinal != -1) {
                this.submit(BlockingProcessorTasklet.this.outstreams[ordinal], item);
            } else {
                for (OutboundEdgeStream outstream : BlockingProcessorTasklet.this.outstreams) {
                    this.submit(outstream, item);
                }
            }
            return true;
        }

        @Override
        public boolean offer(int[] ordinals, @Nonnull Object item) {
            BlockingProcessorTasklet.this.progTracker.madeProgress();
            for (int ord : ordinals) {
                this.submit(BlockingProcessorTasklet.this.outstreams[ord], item);
            }
            return true;
        }

        void add(@Nonnull Object item) {
            boolean accepted = BlockingProcessorTasklet.this.outbox.offer(item);
            assert (accepted) : "Blocking outbox refused an item: " + item;
        }

        private void submit(OutboundEdgeStream outstream, @Nonnull Object item) {
            OutboundCollector collector = outstream.getCollector();
            long idleCount = 0L;
            while (true) {
                ProgressState result;
                ProgressState progressState = result = item instanceof Watermark || item instanceof DoneItem ? collector.offerBroadcast(item) : collector.offer(item);
                if (result.isDone()) {
                    return;
                }
                if (BlockingProcessorTasklet.this.jobFuture.isDone()) {
                    throw new JobFutureCompleted();
                }
                if (result.isMadeProgress()) {
                    idleCount = 0L;
                    continue;
                }
                ExecutionService.IDLER.idle(++idleCount);
            }
        }
    }
}

