/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.core;

import io.atleon.core.AcknowledgementQueueMode;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.function.Predicate;

final class AcknowledgementQueue {
    private static final AtomicReferenceFieldUpdater<AcknowledgementQueue, InFlight> TAIL = AtomicReferenceFieldUpdater.newUpdater(AcknowledgementQueue.class, InFlight.class, "tail");
    private static final AtomicIntegerFieldUpdater<AcknowledgementQueue> DRAINS_IN_PROGRESS = AtomicIntegerFieldUpdater.newUpdater(AcknowledgementQueue.class, "drainsInProgress");
    private final AcknowledgementQueueMode mode;
    private final Queue<InFlight> drainQueue = new ConcurrentLinkedQueue<InFlight>();
    private volatile InFlight tail;
    private volatile int drainsInProgress;

    private AcknowledgementQueue(AcknowledgementQueueMode mode) {
        this.mode = mode;
    }

    public static AcknowledgementQueue create(AcknowledgementQueueMode mode) {
        return new AcknowledgementQueue(mode);
    }

    public InFlight add(Runnable acknowledger, Consumer<? super Throwable> nacknowledger) {
        InFlight newTail;
        InFlight previous;
        do {
            InFlight observedTail;
            previous = (observedTail = this.tail) == null || observedTail.isSevered() ? null : observedTail;
            newTail = new InFlight(acknowledger, nacknowledger, previous);
        } while (previous != null && !previous.casNext(null, newTail));
        TAIL.set(this, newTail);
        return newTail;
    }

    public long complete(InFlight toComplete) {
        return this.complete(toComplete, rec$ -> ((InFlight)rec$).complete()) ? this.drainFrom(toComplete) : 0L;
    }

    public long completeExceptionally(InFlight toComplete, Throwable error) {
        return this.complete(toComplete, inFlight -> ((InFlight)inFlight).completeExceptionally(error)) ? this.drainFrom(toComplete) : 0L;
    }

    private boolean complete(InFlight inFlight, Predicate<InFlight> completer) {
        return completer.test(inFlight);
    }

    private long drainFrom(InFlight completed) {
        this.drainQueue.add(completed);
        return this.drain();
    }

    private long drain() {
        if (DRAINS_IN_PROGRESS.getAndIncrement(this) != 0) {
            return 0L;
        }
        long drained = 0L;
        int missed = 1;
        do {
            InFlight completed = this.drainQueue.poll();
            while (completed != null) {
                if (!completed.isSevered()) {
                    drained += completed.isHead() ? this.drainHead(completed) : this.drainNonHead(completed);
                }
                completed = this.drainQueue.poll();
            }
            InFlight observedTail = this.tail;
            if (observedTail == null || !observedTail.isSevered()) continue;
            TAIL.compareAndSet(this, observedTail, null);
        } while ((missed = DRAINS_IN_PROGRESS.addAndGet(this, -missed)) != 0);
        return drained;
    }

    private long drainHead(InFlight head) {
        long drained = 0L;
        while (head != null && !head.isInProcess()) {
            InFlight next = head.sever();
            head.execute();
            ++drained;
            head = next;
        }
        return drained;
    }

    private long drainNonHead(InFlight nonHead) {
        return this.mode == AcknowledgementQueueMode.COMPACT ? nonHead.tryCompact() : 0L;
    }

    static final class InFlight {
        private static final AtomicReferenceFieldUpdater<InFlight, InFlight> NEXT = AtomicReferenceFieldUpdater.newUpdater(InFlight.class, InFlight.class, "next");
        private static final AtomicReferenceFieldUpdater<InFlight, State> STATE = AtomicReferenceFieldUpdater.newUpdater(InFlight.class, State.class, "state");
        private static final AtomicReferenceFieldUpdater<InFlight, Throwable> ERROR = AtomicReferenceFieldUpdater.newUpdater(InFlight.class, Throwable.class, "error");
        private final Runnable acknowledger;
        private final Consumer<? super Throwable> nacknowledger;
        private volatile InFlight previous;
        private volatile InFlight next;
        private volatile State state = State.IN_PROCESS;
        private volatile Throwable error;

        private InFlight(Runnable acknowledger, Consumer<? super Throwable> nacknowledger, InFlight previous) {
            this.acknowledger = acknowledger;
            this.nacknowledger = nacknowledger;
            this.previous = previous;
        }

        private long tryCompact() {
            long compacted = 0L;
            if (!this.isCompletedWithoutError()) {
                return compacted;
            }
            InFlight left = this.previous;
            if (left.isCompletedWithoutError() && !left.isHead()) {
                left.sever();
                left = left.previous;
                left.next.previous = null;
                ++compacted;
            }
            InFlight right = this;
            if (right.next != null && right.next.isCompletedWithoutError()) {
                right.previous = null;
                right = right.next;
                NEXT.lazySet(right.previous, null);
                ++compacted;
            }
            if (compacted != 0L) {
                NEXT.set(left, right);
                right.previous = left;
            }
            return compacted;
        }

        private InFlight sever() {
            InFlight observedNext = this.next;
            while (!this.casNext(observedNext, this)) {
                observedNext = this.next;
            }
            if (observedNext != null) {
                observedNext.previous = null;
            }
            return observedNext;
        }

        private boolean isHead() {
            return this.previous == null;
        }

        private boolean isSevered() {
            return this.next == this;
        }

        private boolean casNext(InFlight expect, InFlight next) {
            return NEXT.compareAndSet(this, expect, next);
        }

        private boolean isInProcess() {
            return this.state == State.IN_PROCESS;
        }

        private boolean isCompletedWithoutError() {
            return this.state == State.COMPLETED && this.error == null;
        }

        private boolean completeExceptionally(Throwable error) {
            return this.state == State.IN_PROCESS && ERROR.compareAndSet(this, null, error) && this.complete();
        }

        private boolean complete() {
            return STATE.compareAndSet(this, State.IN_PROCESS, State.COMPLETED);
        }

        private void execute() {
            if (STATE.getAndSet(this, State.EXECUTED) == State.EXECUTED) {
                return;
            }
            if (this.error == null) {
                this.acknowledger.run();
            } else {
                this.nacknowledger.accept(this.error);
            }
        }

        private static enum State {
            IN_PROCESS,
            COMPLETED,
            EXECUTED;

        }
    }
}

