/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.core;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Consumer;
import reactor.core.publisher.Sinks;

public final class DrainableQueue<T> {
    private static final AtomicIntegerFieldUpdater<DrainableQueue> DRAINS_IN_PROGRESS = AtomicIntegerFieldUpdater.newUpdater(DrainableQueue.class, "drainsInProgress");
    private volatile int drainsInProgress;
    private final Queue<T> queue = new ConcurrentLinkedQueue<T>();
    private final Consumer<? super T> drain;

    private DrainableQueue(Consumer<? super T> drain) {
        this.drain = drain;
    }

    public static <T> DrainableQueue<T> onEmitNext(Sinks.Many<T> sink) {
        return new DrainableQueue<Object>(t -> sink.emitNext(t, Sinks.EmitFailureHandler.FAIL_FAST));
    }

    public void addAndDrain(T t) {
        this.queue.add(t);
        this.drain();
    }

    private void drain() {
        if (DRAINS_IN_PROGRESS.getAndIncrement(this) != 0) {
            return;
        }
        int missed = 1;
        while (true) {
            if (!this.queue.isEmpty()) {
                this.drain.accept(this.queue.remove());
                continue;
            }
            if ((missed = DRAINS_IN_PROGRESS.addAndGet(this, -missed)) == 0) break;
        }
    }
}

