/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.core;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Consumer;
import reactor.core.publisher.Sinks;

public final class SerialQueue<T> {
    private static final AtomicIntegerFieldUpdater<SerialQueue> DRAINS_IN_PROGRESS = AtomicIntegerFieldUpdater.newUpdater(SerialQueue.class, "drainsInProgress");
    private volatile int drainsInProgress;
    private final Queue<T> queue = new ConcurrentLinkedQueue<T>();
    private final Consumer<? super T> drain;

    private SerialQueue(Consumer<? super T> drain) {
        this.drain = drain;
    }

    public static <T> SerialQueue<Consumer<T>> on(T resource) {
        return new SerialQueue<Consumer<T>>(consumer -> consumer.accept(resource));
    }

    public static <T> SerialQueue<T> onEmitNext(Sinks.Many<T> sink) {
        return SerialQueue.onEmitNext(sink, Sinks.EmitFailureHandler.FAIL_FAST);
    }

    public static <T> SerialQueue<T> onEmitNext(Sinks.Many<T> sink, Sinks.EmitFailureHandler emitFailureHandler) {
        return new SerialQueue<Object>(t -> sink.emitNext(t, emitFailureHandler));
    }

    public void addAndDrain(T t) {
        this.queue.add(t);
        this.drain();
    }

    private void drain() {
        if (DRAINS_IN_PROGRESS.getAndIncrement(this) != 0) {
            return;
        }
        int missed = 1;
        do {
            T t = this.queue.poll();
            while (t != null) {
                this.drain.accept(t);
                t = this.queue.poll();
            }
        } while ((missed = DRAINS_IN_PROGRESS.addAndGet(this, -missed)) != 0);
    }
}

