/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.core;

import io.atleon.core.AcknowledgementQueue;
import io.atleon.core.Alo;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

final class AloQueueingSubscriber<T, A extends Alo<T>>
implements Subscriber<A>,
Subscription {
    private static final AtomicLongFieldUpdater<AloQueueingSubscriber> FREE_CAPACITY = AtomicLongFieldUpdater.newUpdater(AloQueueingSubscriber.class, "freeCapacity");
    private static final AtomicLongFieldUpdater<AloQueueingSubscriber> REQUEST_OUTSTANDING = AtomicLongFieldUpdater.newUpdater(AloQueueingSubscriber.class, "requestOutstanding");
    private static final AtomicIntegerFieldUpdater<AloQueueingSubscriber> REQUESTS_IN_PROGRESS = AtomicIntegerFieldUpdater.newUpdater(AloQueueingSubscriber.class, "requestsInProgress");
    private final Map<Object, AcknowledgementQueue> queuesByGroup = new ConcurrentHashMap<Object, AcknowledgementQueue>();
    private final Subscriber<? super Alo<T>> actual;
    private final Function<T, ?> groupExtractor;
    private final Supplier<? extends AcknowledgementQueue> queueSupplier;
    private Subscription parent;
    private volatile long freeCapacity;
    private volatile long requestOutstanding;
    private volatile int requestsInProgress;

    AloQueueingSubscriber(Subscriber<? super Alo<T>> actual, Function<T, ?> groupExtractor, Supplier<? extends AcknowledgementQueue> queueSupplier, long maxInFlight) {
        this.actual = actual;
        this.queueSupplier = queueSupplier;
        this.groupExtractor = groupExtractor;
        this.freeCapacity = maxInFlight;
    }

    public void onSubscribe(Subscription s) {
        this.parent = s;
        this.actual.onSubscribe((Subscription)this);
    }

    public void onNext(A a) {
        AcknowledgementQueue queue = this.queuesByGroup.computeIfAbsent(this.groupExtractor.apply(a.get()), group -> this.queueSupplier.get());
        AcknowledgementQueue.InFlight inFlight = queue.add(a.getAcknowledger(), a.getNacknowledger());
        Runnable acknowledger = () -> this.postComplete(queue.complete(inFlight));
        Consumer<Throwable> nacknowedger = error -> this.postComplete(queue.completeExceptionally(inFlight, (Throwable)error));
        this.actual.onNext(a.propagator().create(a.get(), acknowledger, nacknowedger));
    }

    public void onError(Throwable t) {
        this.actual.onError(t);
    }

    public void onComplete() {
        this.actual.onComplete();
    }

    public void request(long requested) {
        if (requested > 0L) {
            REQUEST_OUTSTANDING.addAndGet(this, requested);
            this.drainRequest();
        }
    }

    public void cancel() {
        this.parent.cancel();
    }

    private void postComplete(long drainedFromQueue) {
        if (this.freeCapacity != Long.MAX_VALUE && drainedFromQueue > 0L) {
            FREE_CAPACITY.addAndGet(this, drainedFromQueue);
            this.drainRequest();
        }
    }

    private void drainRequest() {
        if (REQUESTS_IN_PROGRESS.getAndIncrement(this) != 0) {
            return;
        }
        int missed = 1;
        do {
            long toRequest;
            if ((toRequest = Math.min(this.freeCapacity, this.requestOutstanding)) <= 0L) continue;
            if (this.freeCapacity != Long.MAX_VALUE) {
                FREE_CAPACITY.addAndGet(this, -toRequest);
            }
            REQUEST_OUTSTANDING.addAndGet(this, -toRequest);
            this.parent.request(toRequest);
        } while ((missed = REQUESTS_IN_PROGRESS.addAndGet(this, -missed)) != 0);
    }
}

