/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.execution;

import com.hazelcast.jet.JetException;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.impl.execution.BroadcastItem;
import com.hazelcast.jet.impl.execution.DoneItem;
import com.hazelcast.jet.impl.execution.InboundEdgeStream;
import com.hazelcast.jet.impl.execution.OutboundCollector;
import com.hazelcast.jet.impl.execution.OutboundEdgeStream;
import com.hazelcast.jet.impl.execution.OutboxImpl;
import com.hazelcast.jet.impl.execution.ProcessorState;
import com.hazelcast.jet.impl.execution.SnapshotBarrier;
import com.hazelcast.jet.impl.execution.SnapshotContext;
import com.hazelcast.jet.impl.execution.Tasklet;
import com.hazelcast.jet.impl.execution.WatermarkCoalescer;
import com.hazelcast.jet.impl.execution.init.Contexts;
import com.hazelcast.jet.impl.util.ArrayDequeInbox;
import com.hazelcast.jet.impl.util.CircularListCursor;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.ProgressTracker;
import com.hazelcast.util.Preconditions;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.TreeMap;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

public class ProcessorTasklet
implements Tasklet {
    private static final int OUTBOX_BATCH_SIZE = 2048;
    private final ProgressTracker progTracker = new ProgressTracker();
    private final OutboundEdgeStream[] outstreams;
    private final OutboxImpl outbox;
    private final Contexts.ProcCtx context;
    private final Processor processor;
    private final SnapshotContext ssContext;
    private final BitSet receivedBarriers;
    private final ArrayDequeInbox inbox = new ArrayDequeInbox(this.progTracker);
    private final Queue<ArrayList<InboundEdgeStream>> instreamGroupQueue;
    private final WatermarkCoalescer watermarkCoalescer;
    private int numActiveOrdinals;
    private CircularListCursor<InboundEdgeStream> instreamCursor;
    private InboundEdgeStream currInstream;
    private ProcessorState state;
    private long pendingSnapshotId;
    private Watermark pendingWatermark;

    public ProcessorTasklet(@Nonnull Contexts.ProcCtx context, @Nonnull Processor processor, @Nonnull List<? extends InboundEdgeStream> instreams, @Nonnull List<? extends OutboundEdgeStream> outstreams, @Nonnull SnapshotContext ssContext, @Nonnull OutboundCollector ssCollector, int maxWatermarkRetainMillis) {
        Preconditions.checkNotNull((Object)processor, (String)"processor");
        this.context = context;
        this.processor = processor;
        this.numActiveOrdinals = instreams.size();
        this.instreamGroupQueue = instreams.stream().collect(Collectors.groupingBy(InboundEdgeStream::priority, TreeMap::new, Collectors.toCollection(ArrayList::new))).entrySet().stream().map(Map.Entry::getValue).collect(Collectors.toCollection(ArrayDeque::new));
        this.outstreams = (OutboundEdgeStream[])outstreams.stream().sorted(Comparator.comparing(OutboundEdgeStream::ordinal)).toArray(OutboundEdgeStream[]::new);
        this.ssContext = ssContext;
        this.instreamCursor = this.popInstreamGroup();
        this.currInstream = this.instreamCursor != null ? this.instreamCursor.value() : null;
        this.outbox = this.createOutbox(ssCollector);
        this.receivedBarriers = new BitSet(instreams.size());
        this.state = this.initialProcessingState();
        this.pendingSnapshotId = ssContext.lastSnapshotId() + 1L;
        this.watermarkCoalescer = WatermarkCoalescer.create(maxWatermarkRetainMillis, instreams.size());
    }

    private OutboxImpl createOutbox(OutboundCollector ssCollector) {
        OutboundCollector[] collectors = new OutboundCollector[this.outstreams.length + (ssCollector == null ? 0 : 1)];
        for (int i = 0; i < this.outstreams.length; ++i) {
            collectors[i] = this.outstreams[i].getCollector();
        }
        if (ssCollector != null) {
            collectors[this.outstreams.length] = ssCollector;
        }
        return new OutboxImpl(collectors, ssCollector != null, this.progTracker, this.context.getSerializationService(), 2048);
    }

    @Override
    public void init() {
        if (this.context.getSerializationService().getManagedContext() != null) {
            Object processor2 = this.context.getSerializationService().getManagedContext().initialize((Object)this.processor);
            assert (processor2 == this.processor) : "different object returned";
        }
        this.processor.init(this.outbox, this.context);
    }

    @Override
    @Nonnull
    public ProgressState call() {
        return this.call(this.watermarkCoalescer.getTime());
    }

    ProgressState call(long now) {
        this.progTracker.reset();
        this.outbox.reset();
        this.stateMachineStep(now);
        return this.progTracker.toProgressState();
    }

    private void stateMachineStep(long now) {
        switch (this.state) {
            case PROCESS_WATERMARK: {
                this.progTracker.notDone();
                if (this.pendingWatermark == null) {
                    long wm = this.watermarkCoalescer.checkWmHistory(now);
                    if (wm == Long.MIN_VALUE) {
                        this.state = ProcessorState.PROCESS_INBOX;
                        this.stateMachineStep(now);
                        break;
                    }
                    this.pendingWatermark = new Watermark(wm);
                }
                if (!this.pendingWatermark.equals(WatermarkCoalescer.IDLE_MESSAGE) && !this.processor.tryProcessWatermark(this.pendingWatermark)) break;
                this.state = ProcessorState.EMIT_WATERMARK;
                this.stateMachineStep(now);
                break;
            }
            case EMIT_WATERMARK: {
                this.progTracker.notDone();
                if (!this.outbox.offer(this.pendingWatermark)) break;
                this.state = ProcessorState.PROCESS_INBOX;
                this.pendingWatermark = null;
                this.stateMachineStep(now);
                break;
            }
            case PROCESS_INBOX: {
                this.progTracker.notDone();
                if (this.inbox.isEmpty() && (this.isSnapshotInbox() || this.processor.tryProcess())) {
                    this.fillInbox(now);
                }
                if (!this.inbox.isEmpty()) {
                    if (this.isSnapshotInbox()) {
                        this.processor.restoreFromSnapshot(this.inbox);
                    } else {
                        this.processor.process(this.currInstream.ordinal(), this.inbox);
                    }
                }
                if (this.inbox.isEmpty()) {
                    if (this.currInstream != null && this.currInstream.isDone()) {
                        this.state = ProcessorState.COMPLETE_EDGE;
                        this.progTracker.madeProgress();
                        return;
                    }
                    if (this.context.snapshottingEnabled() && this.numActiveOrdinals > 0 && this.receivedBarriers.cardinality() == this.numActiveOrdinals) {
                        this.state = ProcessorState.SAVE_SNAPSHOT;
                        return;
                    }
                    if (this.numActiveOrdinals == 0) {
                        this.progTracker.madeProgress();
                        this.state = ProcessorState.COMPLETE;
                    } else {
                        this.state = ProcessorState.PROCESS_WATERMARK;
                    }
                }
                return;
            }
            case COMPLETE_EDGE: {
                this.progTracker.notDone();
                if (this.isSnapshotInbox() ? this.processor.finishSnapshotRestore() : this.processor.completeEdge(this.currInstream.ordinal())) {
                    this.progTracker.madeProgress();
                    this.state = this.initialProcessingState();
                }
                return;
            }
            case SAVE_SNAPSHOT: {
                assert (this.context.snapshottingEnabled()) : "Snapshotting is not enabled";
                this.progTracker.notDone();
                if (this.processor.saveToSnapshot()) {
                    this.progTracker.madeProgress();
                    this.state = ProcessorState.EMIT_BARRIER;
                }
                return;
            }
            case EMIT_BARRIER: {
                assert (this.context.snapshottingEnabled()) : "Snapshotting is not enabled";
                this.progTracker.notDone();
                if (this.outbox.offerToEdgesAndSnapshot(new SnapshotBarrier(this.pendingSnapshotId))) {
                    this.receivedBarriers.clear();
                    ++this.pendingSnapshotId;
                    this.state = this.initialProcessingState();
                }
                return;
            }
            case COMPLETE: {
                this.progTracker.notDone();
                if (this.context.snapshottingEnabled()) {
                    long currSnapshotId = this.ssContext.lastSnapshotId();
                    assert (currSnapshotId <= this.pendingSnapshotId) : "Unexpected new snapshot id " + currSnapshotId + ", current was" + this.pendingSnapshotId;
                    if (currSnapshotId == this.pendingSnapshotId) {
                        this.state = ProcessorState.SAVE_SNAPSHOT;
                        this.progTracker.madeProgress();
                        return;
                    }
                }
                if (this.processor.complete()) {
                    this.progTracker.madeProgress();
                    this.state = ProcessorState.EMIT_DONE_ITEM;
                }
                return;
            }
            case EMIT_DONE_ITEM: {
                if (!this.outbox.offerToEdgesAndSnapshot(DoneItem.DONE_ITEM)) {
                    this.progTracker.notDone();
                    return;
                }
                this.state = ProcessorState.END;
                return;
            }
            default: {
                throw new JetException("Unexpected state: " + (Object)((Object)this.state));
            }
        }
    }

    private void fillInbox(long now) {
        ProgressState result;
        assert (this.inbox.isEmpty()) : "inbox is not empty";
        assert (this.pendingWatermark == null) : "null wm expected, but was " + this.pendingWatermark;
        if (this.instreamCursor == null) {
            return;
        }
        InboundEdgeStream first = this.instreamCursor.value();
        do {
            this.currInstream = this.instreamCursor.value();
            result = ProgressState.NO_PROGRESS;
            if (this.ssContext != null && this.ssContext.processingGuarantee() == ProcessingGuarantee.EXACTLY_ONCE && this.receivedBarriers.get(this.currInstream.ordinal())) {
                this.instreamCursor.advance();
                continue;
            }
            result = this.currInstream.drainTo(this.inbox.queue()::add);
            this.progTracker.madeProgress(result.isMadeProgress());
            Object lastItem = this.inbox.queue().peekLast();
            if (lastItem instanceof Watermark) {
                long newWmValue = ((Watermark)this.inbox.queue().removeLast()).timestamp();
                long wm = this.watermarkCoalescer.observeWm(now, this.currInstream.ordinal(), newWmValue);
                if (wm != Long.MIN_VALUE) {
                    this.pendingWatermark = new Watermark(wm);
                }
            } else if (lastItem instanceof SnapshotBarrier) {
                SnapshotBarrier barrier = (SnapshotBarrier)this.inbox.queue().removeLast();
                this.observeSnapshot(this.currInstream.ordinal(), barrier.snapshotId());
            } else if (lastItem != null && !(lastItem instanceof BroadcastItem)) {
                this.watermarkCoalescer.observeEvent(this.currInstream.ordinal());
            }
            if (result.isDone()) {
                this.receivedBarriers.clear(this.currInstream.ordinal());
                long wm = this.watermarkCoalescer.queueDone(this.currInstream.ordinal());
                if (wm != Long.MIN_VALUE) {
                    assert (this.pendingWatermark == null || this.pendingWatermark.timestamp() < wm) : "trying to assign lower WM. Old=" + this.pendingWatermark.timestamp() + ", new=" + wm;
                    this.pendingWatermark = new Watermark(wm);
                }
                this.instreamCursor.remove();
                --this.numActiveOrdinals;
            }
            if (this.instreamCursor.advance()) continue;
            this.instreamCursor = this.popInstreamGroup();
            return;
        } while (!result.isMadeProgress() && this.instreamCursor.value() != first);
    }

    private CircularListCursor<InboundEdgeStream> popInstreamGroup() {
        return Optional.ofNullable(this.instreamGroupQueue.poll()).map(CircularListCursor::new).orElse(null);
    }

    public String toString() {
        return "ProcessorTasklet{" + this.context.vertexName() + '#' + this.context.globalProcessorIndex() + '}';
    }

    private void observeSnapshot(int ordinal, long snapshotId) {
        if (snapshotId != this.pendingSnapshotId) {
            throw new JetException("Unexpected snapshot barrier " + snapshotId + " from ordinal " + ordinal + " expected " + this.pendingSnapshotId);
        }
        this.receivedBarriers.set(ordinal);
    }

    private ProcessorState initialProcessingState() {
        return this.pendingWatermark != null ? ProcessorState.PROCESS_WATERMARK : (this.instreamCursor == null ? ProcessorState.COMPLETE : ProcessorState.PROCESS_INBOX);
    }

    private boolean isSnapshotInbox() {
        return this.currInstream != null && this.currInstream.priority() == Integer.MIN_VALUE;
    }

    @Override
    public boolean isCooperative() {
        return this.processor.isCooperative();
    }
}

