/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.core;

import io.atleon.core.AcknowledgementQueue;
import io.atleon.core.Alo;
import io.atleon.core.AloComponentExtractor;
import io.atleon.core.AloFactory;
import io.atleon.core.AloQueueListener;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

final class AloQueueingOperator<T, V>
implements Publisher<Alo<V>> {
    private final Publisher<? extends T> source;
    private final Function<T, ?> groupExtractor;
    private final Supplier<? extends AcknowledgementQueue> queueSupplier;
    private final AloQueueListener listener;
    private final AloComponentExtractor<T, V> componentExtractor;
    private final AloFactory<V> factory;
    private final long maxInFlight;

    AloQueueingOperator(Publisher<? extends T> source, Function<T, ?> groupExtractor, Supplier<? extends AcknowledgementQueue> queueSupplier, AloQueueListener listener, AloComponentExtractor<T, V> componentExtractor, AloFactory<V> factory, long maxInFlight) {
        this.source = source;
        this.groupExtractor = groupExtractor;
        this.queueSupplier = queueSupplier;
        this.listener = listener;
        this.componentExtractor = componentExtractor;
        this.factory = factory;
        this.maxInFlight = maxInFlight;
    }

    public void subscribe(Subscriber<? super Alo<V>> actual) {
        AloQueueingSubscriber<T, V> queueingSubscriber = new AloQueueingSubscriber<T, V>(actual, this.groupExtractor, this.queueSupplier, this.listener, this.componentExtractor, this.factory, this.maxInFlight);
        this.source.subscribe(queueingSubscriber);
    }

    private static final class AloQueueingSubscriber<T, V>
    implements Subscriber<T>,
    Subscription {
        private static final AtomicLongFieldUpdater<AloQueueingSubscriber> FREE_CAPACITY = AtomicLongFieldUpdater.newUpdater(AloQueueingSubscriber.class, "freeCapacity");
        private static final AtomicLongFieldUpdater<AloQueueingSubscriber> REQUEST_OUTSTANDING = AtomicLongFieldUpdater.newUpdater(AloQueueingSubscriber.class, "requestOutstanding");
        private static final AtomicIntegerFieldUpdater<AloQueueingSubscriber> REQUESTS_IN_PROGRESS = AtomicIntegerFieldUpdater.newUpdater(AloQueueingSubscriber.class, "requestsInProgress");
        private final Subscriber<? super Alo<V>> actual;
        private final Function<T, ?> groupExtractor;
        private final Supplier<? extends AcknowledgementQueue> queueSupplier;
        private final AloQueueListener listener;
        private final AloComponentExtractor<T, V> componentExtractor;
        private final AloFactory<V> factory;
        private final Map<Object, AcknowledgementQueue> queuesByGroup = new ConcurrentHashMap<Object, AcknowledgementQueue>();
        private Subscription parent;
        private volatile long freeCapacity;
        private volatile long requestOutstanding;
        private volatile int requestsInProgress;

        public AloQueueingSubscriber(Subscriber<? super Alo<V>> actual, Function<T, ?> groupExtractor, Supplier<? extends AcknowledgementQueue> queueSupplier, AloQueueListener listener, AloComponentExtractor<T, V> componentExtractor, AloFactory<V> factory, long maxInFlight) {
            this.actual = actual;
            this.groupExtractor = groupExtractor;
            this.queueSupplier = queueSupplier;
            this.listener = listener;
            this.componentExtractor = componentExtractor;
            this.factory = factory;
            this.freeCapacity = maxInFlight;
        }

        public void onSubscribe(Subscription s) {
            this.parent = s;
            this.actual.onSubscribe((Subscription)this);
        }

        public void onNext(T t) {
            Object group = this.groupExtractor.apply(t);
            AcknowledgementQueue queue = this.queuesByGroup.computeIfAbsent(group, this::newQueueForGroup);
            AcknowledgementQueue.InFlight inFlight = queue.add(this.componentExtractor.nativeAcknowledger(t), this.componentExtractor.nativeNacknowledger(t));
            this.listener.enqueued(group, 1L);
            Runnable acknowledger = () -> this.postComplete(group, queue.complete(inFlight));
            Consumer<Throwable> nacknowedger = error -> this.postComplete(group, queue.completeExceptionally(inFlight, (Throwable)error));
            this.actual.onNext(this.factory.create(this.componentExtractor.value(t), acknowledger, nacknowedger));
        }

        public void onError(Throwable t) {
            this.listener.close();
            this.actual.onError(t);
        }

        public void onComplete() {
            this.listener.close();
            this.actual.onComplete();
        }

        public void request(long requested) {
            if (requested > 0L) {
                REQUEST_OUTSTANDING.addAndGet(this, requested);
                this.drainRequest();
            }
        }

        public void cancel() {
            try {
                this.parent.cancel();
            }
            finally {
                this.listener.close();
            }
        }

        private AcknowledgementQueue newQueueForGroup(Object group) {
            this.listener.created(group);
            return this.queueSupplier.get();
        }

        private void postComplete(Object group, long drainedFromQueue) {
            if (drainedFromQueue > 0L) {
                this.listener.dequeued(group, drainedFromQueue);
                if (this.freeCapacity != Long.MAX_VALUE) {
                    FREE_CAPACITY.addAndGet(this, drainedFromQueue);
                    this.drainRequest();
                }
            }
        }

        private void drainRequest() {
            if (REQUESTS_IN_PROGRESS.getAndIncrement(this) != 0) {
                return;
            }
            int missed = 1;
            do {
                long toRequest;
                if ((toRequest = Math.min(this.freeCapacity, this.requestOutstanding)) <= 0L) continue;
                if (this.freeCapacity != Long.MAX_VALUE) {
                    FREE_CAPACITY.addAndGet(this, -toRequest);
                }
                REQUEST_OUTSTANDING.addAndGet(this, -toRequest);
                this.parent.request(toRequest);
            } while ((missed = REQUESTS_IN_PROGRESS.addAndGet(this, -missed)) != 0);
        }
    }
}

