/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.execution;

import com.hazelcast.internal.util.concurrent.ConcurrentConveyor;
import com.hazelcast.internal.util.concurrent.Pipe;
import com.hazelcast.internal.util.concurrent.QueuedPipe;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.impl.execution.BroadcastItem;
import com.hazelcast.jet.impl.execution.DoneItem;
import com.hazelcast.jet.impl.execution.InboundEdgeStream;
import com.hazelcast.jet.impl.execution.SnapshotBarrier;
import com.hazelcast.jet.impl.execution.WatermarkCoalescer;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.ProgressTracker;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import java.util.BitSet;
import java.util.Collection;
import java.util.function.Predicate;
import java.util.function.ToIntFunction;

public class ConcurrentInboundEdgeStream
implements InboundEdgeStream {
    private final int ordinal;
    private final int priority;
    private final ConcurrentConveyor<Object> conveyor;
    private final ProgressTracker tracker = new ProgressTracker();
    private final ItemDetector itemDetector = new ItemDetector();
    private final WatermarkCoalescer watermarkCoalescer;
    private final BitSet receivedBarriers;
    private final ILogger logger;
    private boolean waitForAllBarriers;
    private SnapshotBarrier currentBarrier;
    private long numActiveQueues;

    public ConcurrentInboundEdgeStream(ConcurrentConveyor<Object> conveyor, int ordinal, int priority, boolean waitForAllBarriers, String debugName) {
        this.conveyor = conveyor;
        this.ordinal = ordinal;
        this.priority = priority;
        this.waitForAllBarriers = waitForAllBarriers;
        this.watermarkCoalescer = WatermarkCoalescer.create(conveyor.queueCount());
        this.numActiveQueues = conveyor.queueCount();
        this.receivedBarriers = new BitSet(conveyor.queueCount());
        this.logger = Logger.getLogger(ConcurrentInboundEdgeStream.class.getName() + "." + debugName);
        this.logger.finest("Coalescing " + conveyor.queueCount() + " input queues");
    }

    @Override
    public int ordinal() {
        return this.ordinal;
    }

    @Override
    public int priority() {
        return this.priority;
    }

    @Override
    public ProgressState drainTo(Predicate<Object> dest) {
        this.tracker.reset();
        for (int queueIndex = 0; queueIndex < this.conveyor.queueCount(); ++queueIndex) {
            long wmTimestamp;
            QueuedPipe<Object> q = this.conveyor.queue(queueIndex);
            if (q == null || this.waitForAllBarriers && this.receivedBarriers.get(queueIndex)) continue;
            ProgressState result = this.drainQueue(q, dest);
            this.tracker.mergeWith(result);
            if (this.itemDetector.item == DoneItem.DONE_ITEM) {
                this.conveyor.removeQueue(queueIndex);
                this.receivedBarriers.clear(queueIndex);
                --this.numActiveQueues;
                wmTimestamp = this.watermarkCoalescer.queueDone(queueIndex);
                if (this.maybeEmitWm(wmTimestamp, dest)) {
                    if (this.logger.isFinestEnabled()) {
                        this.logger.finest("Queue " + queueIndex + " is done, forwarding " + new Watermark(wmTimestamp));
                    }
                    return this.numActiveQueues == 0L ? ProgressState.DONE : ProgressState.MADE_PROGRESS;
                }
            } else if (this.itemDetector.item instanceof Watermark) {
                wmTimestamp = ((Watermark)this.itemDetector.item).timestamp();
                boolean forwarded = this.maybeEmitWm(this.watermarkCoalescer.observeWm(queueIndex, wmTimestamp), dest);
                if (this.logger.isFinestEnabled()) {
                    this.logger.finest("Received " + this.itemDetector.item + " from queue " + queueIndex + (forwarded ? ", forwarded=" : ", not forwarded") + ", coalescedWm=" + Util.toLocalTime(this.watermarkCoalescer.coalescedWm()) + ", topObservedWm=" + Util.toLocalTime(this.topObservedWm()));
                }
                if (forwarded) {
                    return ProgressState.MADE_PROGRESS;
                }
            } else if (this.itemDetector.item instanceof SnapshotBarrier) {
                this.observeBarrier(queueIndex, (SnapshotBarrier)this.itemDetector.item);
            } else if (result.isMadeProgress()) {
                this.watermarkCoalescer.observeEvent(queueIndex);
            }
            if (this.numActiveQueues == 0L) {
                return this.tracker.toProgressState();
            }
            if (this.itemDetector.item == null || (long)this.receivedBarriers.cardinality() != this.numActiveQueues) continue;
            assert (this.currentBarrier != null) : "currentBarrier == null";
            boolean res = dest.test(this.currentBarrier);
            assert (res) : "test result expected to be true";
            this.currentBarrier = null;
            this.receivedBarriers.clear();
            return ProgressState.MADE_PROGRESS;
        }
        if (this.maybeEmitWm(this.watermarkCoalescer.checkWmHistory(), dest)) {
            return ProgressState.MADE_PROGRESS;
        }
        if (this.numActiveQueues > 0L) {
            this.tracker.notDone();
        }
        return this.tracker.toProgressState();
    }

    private boolean maybeEmitWm(long timestamp, Predicate<Object> dest) {
        if (timestamp != Long.MIN_VALUE) {
            boolean res = dest.test(new Watermark(timestamp));
            assert (res) : "test result expected to be true";
            return true;
        }
        return false;
    }

    @Override
    public boolean isDone() {
        return this.numActiveQueues == 0L;
    }

    private ProgressState drainQueue(Pipe<Object> queue, Predicate<Object> dest) {
        this.itemDetector.reset(dest);
        int drainedCount = queue.drain(this.itemDetector);
        this.itemDetector.dest = null;
        return ProgressState.valueOf(drainedCount > 0, this.itemDetector.item == DoneItem.DONE_ITEM);
    }

    private void observeBarrier(int queueIndex, SnapshotBarrier barrier) {
        if (this.currentBarrier == null) {
            this.currentBarrier = barrier;
        } else assert (this.currentBarrier.equals(barrier)) : this.currentBarrier + " != " + barrier;
        if (barrier.isTerminal()) {
            this.waitForAllBarriers = true;
        }
        this.receivedBarriers.set(queueIndex);
    }

    @Override
    public int sizes() {
        return this.conveyorSum(Collection::size);
    }

    @Override
    public int capacities() {
        return this.conveyorSum(Pipe::capacity);
    }

    private int conveyorSum(ToIntFunction<QueuedPipe<Object>> toIntF) {
        int sum = 0;
        for (int queueIndex = 0; queueIndex < this.conveyor.queueCount(); ++queueIndex) {
            QueuedPipe<Object> q = this.conveyor.queue(queueIndex);
            if (q == null) continue;
            sum += toIntF.applyAsInt(q);
        }
        return sum;
    }

    @Override
    public long topObservedWm() {
        return this.watermarkCoalescer.topObservedWm();
    }

    @Override
    public long coalescedWm() {
        return this.watermarkCoalescer.coalescedWm();
    }

    private static final class ItemDetector
    implements Predicate<Object> {
        Predicate<Object> dest;
        BroadcastItem item;

        private ItemDetector() {
        }

        void reset(Predicate<Object> newDest) {
            this.dest = newDest;
            this.item = null;
        }

        @Override
        public boolean test(Object o) {
            if (o instanceof Watermark || o instanceof SnapshotBarrier || o == DoneItem.DONE_ITEM) {
                assert (this.item == null) : "Received multiple special items without a call to reset(): " + this.item;
                this.item = (BroadcastItem)o;
                return false;
            }
            return this.dest.test(o);
        }
    }
}

