/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.execution;

import com.hazelcast.internal.util.concurrent.ConcurrentConveyor;
import com.hazelcast.internal.util.concurrent.Pipe;
import com.hazelcast.internal.util.concurrent.QueuedPipe;
import com.hazelcast.jet.Watermark;
import com.hazelcast.jet.impl.execution.DoneItem;
import com.hazelcast.jet.impl.execution.InboundEdgeStream;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.ProgressTracker;
import com.hazelcast.jet.impl.util.SkewReductionPolicy;
import com.hazelcast.util.function.Predicate;
import java.util.Collection;

public class ConcurrentInboundEdgeStream
implements InboundEdgeStream {
    private final int ordinal;
    private final int priority;
    private final ConcurrentConveyor<Object> conveyor;
    private final ProgressTracker tracker = new ProgressTracker();
    private final WatermarkDetector wmDetector = new WatermarkDetector();
    private long lastEmittedWm = Long.MIN_VALUE;
    private final SkewReductionPolicy skewReductionPolicy;

    public ConcurrentInboundEdgeStream(ConcurrentConveyor<Object> conveyor, int ordinal, int priority) {
        this.conveyor = conveyor;
        this.ordinal = ordinal;
        this.priority = priority;
        this.skewReductionPolicy = new SkewReductionPolicy(conveyor.queueCount());
    }

    @Override
    public int ordinal() {
        return this.ordinal;
    }

    @Override
    public int priority() {
        return this.priority;
    }

    @Override
    public ProgressState drainTo(Collection<Object> dest) {
        long bottomWm;
        this.tracker.reset();
        for (int drainOrderIdx = 0; drainOrderIdx < this.conveyor.queueCount(); ++drainOrderIdx) {
            int queueIndex = this.skewReductionPolicy.toQueueIndex(drainOrderIdx);
            QueuedPipe<Object> q = this.conveyor.queue(queueIndex);
            if (q == null) continue;
            Watermark wm = this.drainUpToWm(q, dest);
            if (this.wmDetector.isDone) {
                this.conveyor.removeQueue(queueIndex);
                continue;
            }
            if (wm != null && this.skewReductionPolicy.observeWm(queueIndex, wm.timestamp())) break;
        }
        if ((bottomWm = this.skewReductionPolicy.bottomObservedWm()) > this.lastEmittedWm) {
            dest.add(new Watermark(bottomWm));
            this.lastEmittedWm = bottomWm;
        }
        return this.tracker.toProgressState();
    }

    private Watermark drainUpToWm(Pipe<Object> queue, Collection<Object> dest) {
        this.wmDetector.reset(dest);
        int drainedCount = queue.drain(this.wmDetector);
        this.tracker.mergeWith(ProgressState.valueOf(drainedCount > 0, this.wmDetector.isDone));
        this.wmDetector.dest = null;
        return this.wmDetector.wm;
    }

    private static final class WatermarkDetector
    implements Predicate<Object> {
        Collection<Object> dest;
        Watermark wm;
        boolean isDone;

        private WatermarkDetector() {
        }

        void reset(Collection<Object> newDest) {
            this.dest = newDest;
            this.wm = null;
            this.isDone = false;
        }

        @Override
        public boolean test(Object o) {
            if (o instanceof Watermark) {
                assert (this.wm == null) : "Received multiple Watermarks without a call to reset()";
                this.wm = (Watermark)o;
                return false;
            }
            if (o == DoneItem.DONE_ITEM) {
                this.isDone = true;
                return false;
            }
            this.dest.add(o);
            return true;
        }
    }
}

