/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.execution;

import com.hazelcast.internal.util.concurrent.ConcurrentConveyor;
import com.hazelcast.internal.util.concurrent.Pipe;
import com.hazelcast.internal.util.concurrent.QueuedPipe;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.impl.execution.BroadcastItem;
import com.hazelcast.jet.impl.execution.DoneItem;
import com.hazelcast.jet.impl.execution.InboundEdgeStream;
import com.hazelcast.jet.impl.execution.SnapshotBarrier;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.ProgressTracker;
import com.hazelcast.util.function.Predicate;
import java.util.Arrays;
import java.util.BitSet;
import java.util.function.Consumer;

public class ConcurrentInboundEdgeStream
implements InboundEdgeStream {
    private final int ordinal;
    private final int priority;
    private final ConcurrentConveyor<Object> conveyor;
    private final ProgressTracker tracker = new ProgressTracker();
    private final ItemDetector itemDetector = new ItemDetector();
    private final boolean waitForSnapshot;
    private final long[] queueWms;
    private final BitSet receivedBarriers;
    private long pendingSnapshotId;
    private long lastEmittedWm = Long.MIN_VALUE;
    private long numActiveQueues;

    public ConcurrentInboundEdgeStream(ConcurrentConveyor<Object> conveyor, int ordinal, int priority, long lastSnapshotId, boolean waitForSnapshot) {
        this.conveyor = conveyor;
        this.ordinal = ordinal;
        this.priority = priority;
        this.waitForSnapshot = waitForSnapshot;
        this.queueWms = new long[conveyor.queueCount()];
        Arrays.fill(this.queueWms, Long.MIN_VALUE);
        this.numActiveQueues = conveyor.queueCount();
        this.receivedBarriers = new BitSet(conveyor.queueCount());
        this.pendingSnapshotId = lastSnapshotId + 1L;
    }

    @Override
    public int ordinal() {
        return this.ordinal;
    }

    @Override
    public int priority() {
        return this.priority;
    }

    @Override
    public ProgressState drainTo(Consumer<Object> dest) {
        this.tracker.reset();
        for (int queueIndex = 0; queueIndex < this.conveyor.queueCount(); ++queueIndex) {
            QueuedPipe q = this.conveyor.queue(queueIndex);
            if (q == null || this.waitForSnapshot && this.receivedBarriers.get(queueIndex)) continue;
            this.drainQueue((Pipe<Object>)q, dest);
            if (this.itemDetector.item == DoneItem.DONE_ITEM) {
                this.conveyor.removeQueue(queueIndex);
                this.receivedBarriers.clear(queueIndex);
                this.queueWms[queueIndex] = Long.MAX_VALUE;
                --this.numActiveQueues;
            } else if (this.itemDetector.item instanceof Watermark) {
                this.observeWm(queueIndex, ((Watermark)this.itemDetector.item).timestamp());
            } else if (this.itemDetector.item instanceof SnapshotBarrier) {
                this.observeBarrier(queueIndex, ((SnapshotBarrier)this.itemDetector.item).snapshotId());
            }
            if (this.numActiveQueues == 0L) {
                return this.tracker.toProgressState();
            }
            if (this.itemDetector.item == null) continue;
            long bottomWm = this.bottomObservedWm();
            if (bottomWm > this.lastEmittedWm) {
                this.lastEmittedWm = bottomWm;
                dest.accept(new Watermark(bottomWm));
                break;
            }
            if ((long)this.receivedBarriers.cardinality() != this.numActiveQueues) continue;
            dest.accept(new SnapshotBarrier(this.pendingSnapshotId));
            ++this.pendingSnapshotId;
            this.receivedBarriers.clear();
            break;
        }
        if (this.numActiveQueues > 0L) {
            this.tracker.notDone();
        }
        return this.tracker.toProgressState();
    }

    @Override
    public boolean isDone() {
        return this.numActiveQueues == 0L;
    }

    private void drainQueue(Pipe<Object> queue, Consumer<Object> dest) {
        this.itemDetector.reset(dest);
        int drainedCount = queue.drain((Predicate)this.itemDetector);
        this.tracker.mergeWith(ProgressState.valueOf(drainedCount > 0, this.itemDetector.item == DoneItem.DONE_ITEM));
        this.itemDetector.dest = null;
    }

    private void observeBarrier(int queueIndex, long snapshotId) {
        if (snapshotId != this.pendingSnapshotId) {
            throw new JetException("Unexpected snapshot barrier " + snapshotId + ", expected " + this.pendingSnapshotId);
        }
        this.receivedBarriers.set(queueIndex);
    }

    private void observeWm(int queueIndex, long wmValue) {
        if (this.queueWms[queueIndex] >= wmValue) {
            throw new JetException("Watermarks not monotonically increasing on queue: last one=" + this.queueWms[queueIndex] + ", new one=" + wmValue);
        }
        this.queueWms[queueIndex] = wmValue;
    }

    private long bottomObservedWm() {
        long min = this.queueWms[0];
        for (int i = 1; i < this.queueWms.length; ++i) {
            if (this.queueWms[i] >= min) continue;
            min = this.queueWms[i];
        }
        return min;
    }

    private static final class ItemDetector
    implements Predicate<Object> {
        Consumer<Object> dest;
        BroadcastItem item;

        private ItemDetector() {
        }

        void reset(Consumer<Object> newDest) {
            this.dest = newDest;
            this.item = null;
        }

        public boolean test(Object o) {
            if (o instanceof Watermark || o instanceof SnapshotBarrier || o == DoneItem.DONE_ITEM) {
                assert (this.item == null) : "Received multiple special items without a call to reset(): " + this.item;
                this.item = (BroadcastItem)o;
                return false;
            }
            this.dest.accept(o);
            return true;
        }
    }
}

