/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.execution;

import com.hazelcast.internal.util.concurrent.ConcurrentConveyor;
import com.hazelcast.internal.util.concurrent.Pipe;
import com.hazelcast.internal.util.concurrent.QueuedPipe;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.impl.execution.BroadcastItem;
import com.hazelcast.jet.impl.execution.DoneItem;
import com.hazelcast.jet.impl.execution.InboundEdgeStream;
import com.hazelcast.jet.impl.execution.SnapshotBarrier;
import com.hazelcast.jet.impl.execution.WatermarkCoalescer;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.ProgressTracker;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.util.function.Predicate;
import java.util.BitSet;
import java.util.function.Consumer;

public class ConcurrentInboundEdgeStream
implements InboundEdgeStream {
    private final int ordinal;
    private final int priority;
    private final boolean waitForSnapshot;
    private final ConcurrentConveyor<Object> conveyor;
    private final ProgressTracker tracker = new ProgressTracker();
    private final ItemDetector itemDetector = new ItemDetector();
    private final WatermarkCoalescer watermarkCoalescer;
    private final BitSet receivedBarriers;
    private final ILogger logger;
    private long pendingSnapshotId;
    private long numActiveQueues;

    public ConcurrentInboundEdgeStream(ConcurrentConveyor<Object> conveyor, int ordinal, int priority, long lastSnapshotId, boolean waitForSnapshot, int maxWatermarkRetainMillis, String debugName) {
        this.conveyor = conveyor;
        this.ordinal = ordinal;
        this.priority = priority;
        this.waitForSnapshot = waitForSnapshot;
        this.watermarkCoalescer = WatermarkCoalescer.create(maxWatermarkRetainMillis, conveyor.queueCount());
        this.numActiveQueues = conveyor.queueCount();
        this.receivedBarriers = new BitSet(conveyor.queueCount());
        this.pendingSnapshotId = lastSnapshotId + 1L;
        this.logger = Logger.getLogger(ConcurrentInboundEdgeStream.class.getName() + "." + debugName);
    }

    @Override
    public int ordinal() {
        return this.ordinal;
    }

    @Override
    public int priority() {
        return this.priority;
    }

    @Override
    public ProgressState drainTo(Consumer<Object> dest) {
        return this.drainTo(this.watermarkCoalescer.getTime(), dest);
    }

    ProgressState drainTo(long now, Consumer<Object> dest) {
        this.tracker.reset();
        for (int queueIndex = 0; queueIndex < this.conveyor.queueCount(); ++queueIndex) {
            QueuedPipe<Object> q = this.conveyor.queue(queueIndex);
            if (q == null || this.waitForSnapshot && this.receivedBarriers.get(queueIndex)) continue;
            ProgressState result = this.drainQueue(q, dest);
            this.tracker.mergeWith(result);
            if (this.itemDetector.item == DoneItem.DONE_ITEM) {
                this.conveyor.removeQueue(queueIndex);
                this.receivedBarriers.clear(queueIndex);
                --this.numActiveQueues;
                if (this.maybeEmitWm(this.watermarkCoalescer.queueDone(queueIndex), dest)) {
                    return this.numActiveQueues == 0L ? ProgressState.DONE : ProgressState.MADE_PROGRESS;
                }
            } else if (this.itemDetector.item instanceof Watermark) {
                long wmTimestamp = ((Watermark)this.itemDetector.item).timestamp();
                boolean forwarded = this.maybeEmitWm(this.watermarkCoalescer.observeWm(now, queueIndex, wmTimestamp), dest);
                if (this.logger.isFinestEnabled()) {
                    this.logger.finest("Received " + this.itemDetector.item + " from queue " + queueIndex + '/' + this.conveyor.queueCount() + (forwarded ? ", forwarded" : ", not forwarded"));
                }
                if (forwarded) {
                    return ProgressState.MADE_PROGRESS;
                }
            } else if (this.itemDetector.item instanceof SnapshotBarrier) {
                this.observeBarrier(queueIndex, ((SnapshotBarrier)this.itemDetector.item).snapshotId());
            } else if (result.isMadeProgress()) {
                this.watermarkCoalescer.observeEvent(queueIndex);
            }
            if (this.numActiveQueues == 0L) {
                return this.tracker.toProgressState();
            }
            if (this.itemDetector.item == null || (long)this.receivedBarriers.cardinality() != this.numActiveQueues) continue;
            dest.accept(new SnapshotBarrier(this.pendingSnapshotId));
            ++this.pendingSnapshotId;
            this.receivedBarriers.clear();
            return ProgressState.MADE_PROGRESS;
        }
        if (this.maybeEmitWm(this.watermarkCoalescer.checkWmHistory(now), dest)) {
            return ProgressState.MADE_PROGRESS;
        }
        if (this.numActiveQueues > 0L) {
            this.tracker.notDone();
        }
        return this.tracker.toProgressState();
    }

    private boolean maybeEmitWm(long timestamp, Consumer<Object> dest) {
        if (timestamp != Long.MIN_VALUE) {
            dest.accept(new Watermark(timestamp));
            return true;
        }
        return false;
    }

    @Override
    public boolean isDone() {
        return this.numActiveQueues == 0L;
    }

    private ProgressState drainQueue(Pipe<Object> queue, Consumer<Object> dest) {
        this.itemDetector.reset(dest);
        int drainedCount = queue.drain(this.itemDetector);
        this.itemDetector.dest = null;
        return ProgressState.valueOf(drainedCount > 0, this.itemDetector.item == DoneItem.DONE_ITEM);
    }

    private void observeBarrier(int queueIndex, long snapshotId) {
        if (snapshotId != this.pendingSnapshotId) {
            throw new JetException("Unexpected snapshot barrier " + snapshotId + ", expected " + this.pendingSnapshotId);
        }
        this.receivedBarriers.set(queueIndex);
    }

    private static final class ItemDetector
    implements Predicate<Object> {
        Consumer<Object> dest;
        BroadcastItem item;

        private ItemDetector() {
        }

        void reset(Consumer<Object> newDest) {
            this.dest = newDest;
            this.item = null;
        }

        @Override
        public boolean test(Object o) {
            if (o instanceof Watermark || o instanceof SnapshotBarrier || o == DoneItem.DONE_ITEM) {
                assert (this.item == null) : "Received multiple special items without a call to reset(): " + this.item;
                this.item = (BroadcastItem)o;
                return false;
            }
            this.dest.accept(o);
            return true;
        }
    }
}

