/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.reactive.client.internal.api;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.reactive.client.internal.api.PublisherTransformer;
import org.jctools.queues.MpmcArrayQueue;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoOperator;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;

public class InflightLimiter
implements PublisherTransformer {
    public static final int DEFAULT_MAX_PENDING_SUBSCRIPTIONS = 1024;
    private final MpmcArrayQueue<InflightLimiterSubscriber<?>> pendingSubscriptions;
    private final AtomicInteger inflight = new AtomicInteger();
    private final AtomicInteger activeSubscriptions = new AtomicInteger();
    private final int maxInflight;
    private final int expectedSubscriptionsInflight;
    private final Scheduler.Worker triggerNextWorker;
    private final AtomicBoolean triggerNextTriggered = new AtomicBoolean(false);

    public InflightLimiter(int maxInflight) {
        this(maxInflight, 0, Schedulers.single(), 1024);
    }

    public InflightLimiter(int maxInflight, int expectedSubscriptionsInflight, Scheduler triggerNextScheduler, int maxPendingSubscriptions) {
        if (maxInflight < 1) {
            throw new IllegalArgumentException("maxInflight must be greater than 0");
        }
        this.maxInflight = maxInflight;
        this.expectedSubscriptionsInflight = expectedSubscriptionsInflight;
        this.triggerNextWorker = triggerNextScheduler.createWorker();
        if (expectedSubscriptionsInflight > maxInflight) {
            throw new IllegalArgumentException("maxSubscriptionInflight must be equal or less than maxInflight.");
        }
        this.pendingSubscriptions = new MpmcArrayQueue(maxPendingSubscriptions);
    }

    @Override
    public <T> Publisher<T> transform(Publisher<T> publisher) {
        if (publisher instanceof Mono) {
            return this.createOperator((Mono)publisher);
        }
        return this.createOperator(Flux.from(publisher));
    }

    <I> Flux<I> createOperator(Flux<I> source) {
        return new FluxOperator<I, I>(source){

            public void subscribe(CoreSubscriber<? super I> actual) {
                InflightLimiter.this.handleSubscribe(this.source, actual);
            }
        };
    }

    <I> Mono<I> createOperator(Mono<I> source) {
        return new MonoOperator<I, I>(source){

            public void subscribe(CoreSubscriber<? super I> actual) {
                InflightLimiter.this.handleSubscribe(this.source, actual);
            }
        };
    }

    <I> void handleSubscribe(Publisher<I> source, CoreSubscriber<? super I> actual) {
        this.activeSubscriptions.incrementAndGet();
        InflightLimiterSubscriber<I> subscriber = new InflightLimiterSubscriber<I>(actual, source);
        actual.onSubscribe(subscriber.getSubscription());
    }

    void maybeTriggerNext() {
        if (!this.triggerNextWorker.isDisposed() && this.inflight.get() < this.maxInflight && !this.pendingSubscriptions.isEmpty() && this.triggerNextTriggered.compareAndSet(false, true)) {
            this.triggerNextWorker.schedule(() -> {
                InflightLimiterSubscriber subscriber;
                this.triggerNextTriggered.set(false);
                int remainingSubscriptions = this.pendingSubscriptions.size();
                while (this.inflight.get() < this.maxInflight && remainingSubscriptions-- > 0 && (subscriber = (InflightLimiterSubscriber)((Object)((Object)this.pendingSubscriptions.poll()))) != null) {
                    if (subscriber.isDisposed()) continue;
                    subscriber.requestMore();
                }
            });
        }
    }

    void scheduleSubscribed(InflightLimiterSubscriber<?> subscriber) {
        if (!this.triggerNextWorker.isDisposed()) {
            this.triggerNextWorker.schedule(() -> {
                if (!subscriber.isDisposed()) {
                    subscriber.requestMore();
                }
            });
        }
    }

    public void dispose() {
        this.triggerNextWorker.dispose();
        this.pendingSubscriptions.drain(BaseSubscriber::cancel);
    }

    public boolean isDisposed() {
        return this.triggerNextWorker.isDisposed();
    }

    private class InflightLimiterSubscriber<I>
    extends BaseSubscriber<I> {
        private final CoreSubscriber<? super I> actual;
        private final Publisher<I> source;
        private final AtomicLong requestedDemand = new AtomicLong();
        private final AtomicReference<InflightLimiterSubscriberState> state = new AtomicReference<InflightLimiterSubscriberState>(InflightLimiterSubscriberState.INITIAL);
        private final AtomicInteger inflightForSubscription = new AtomicInteger();
        private final Subscription subscription = new Subscription(){

            public void request(long n) {
                if (n == Long.MAX_VALUE) {
                    InflightLimiterSubscriber.this.requestedDemand.set(n);
                } else if (InflightLimiterSubscriber.this.requestedDemand.get() != Long.MAX_VALUE) {
                    InflightLimiterSubscriber.this.requestedDemand.addAndGet(n);
                }
                InflightLimiterSubscriber.this.maybeAddToPending();
                InflightLimiter.this.maybeTriggerNext();
            }

            public void cancel() {
                InflightLimiterSubscriber.this.cancel();
            }
        };

        InflightLimiterSubscriber(CoreSubscriber<? super I> actual, Publisher<I> source) {
            this.actual = actual;
            this.source = source;
        }

        public Context currentContext() {
            return this.actual.currentContext();
        }

        protected void hookOnSubscribe(Subscription subscription) {
            if (this.state.compareAndSet(InflightLimiterSubscriberState.SUBSCRIBING, InflightLimiterSubscriberState.SUBSCRIBED)) {
                InflightLimiter.this.scheduleSubscribed(this);
            }
        }

        protected void hookOnNext(I value) {
            this.actual.onNext(value);
            InflightLimiter.this.inflight.decrementAndGet();
            this.inflightForSubscription.decrementAndGet();
            this.maybeAddToPending();
            InflightLimiter.this.maybeTriggerNext();
        }

        protected void hookOnComplete() {
            InflightLimiter.this.activeSubscriptions.decrementAndGet();
            this.actual.onComplete();
            this.clearInflight();
            InflightLimiter.this.maybeTriggerNext();
        }

        private void clearInflight() {
            InflightLimiter.this.inflight.addAndGet(-this.inflightForSubscription.getAndSet(0));
        }

        protected void hookOnError(Throwable throwable) {
            InflightLimiter.this.activeSubscriptions.decrementAndGet();
            this.actual.onError(throwable);
            this.clearInflight();
            InflightLimiter.this.maybeTriggerNext();
        }

        protected void hookOnCancel() {
            InflightLimiter.this.activeSubscriptions.decrementAndGet();
            this.clearInflight();
            this.requestedDemand.set(0L);
            InflightLimiter.this.maybeTriggerNext();
        }

        Subscription getSubscription() {
            return this.subscription;
        }

        void requestMore() {
            int maxInflightForSubscription = Math.max(InflightLimiter.this.maxInflight / Math.max(InflightLimiter.this.activeSubscriptions.get(), InflightLimiter.this.expectedSubscriptionsInflight), 1);
            if (this.requestedDemand.get() > 0L && (this.state.get() == InflightLimiterSubscriberState.SUBSCRIBED || this.inflightForSubscription.get() < maxInflightForSubscription && InflightLimiter.this.inflight.get() < InflightLimiter.this.maxInflight)) {
                if (this.state.compareAndSet(InflightLimiterSubscriberState.INITIAL, InflightLimiterSubscriberState.SUBSCRIBING)) {
                    InflightLimiter.this.inflight.incrementAndGet();
                    this.inflightForSubscription.incrementAndGet();
                    this.source.subscribe((Subscriber)this);
                } else if (this.state.get() == InflightLimiterSubscriberState.REQUESTING || this.state.get() == InflightLimiterSubscriberState.SUBSCRIBED) {
                    if (this.state.compareAndSet(InflightLimiterSubscriberState.SUBSCRIBED, InflightLimiterSubscriberState.REQUESTING)) {
                        InflightLimiter.this.inflight.decrementAndGet();
                        this.inflightForSubscription.decrementAndGet();
                    }
                    long maxRequest = Math.max(Math.min(this.requestedDemand.get(), (long)(maxInflightForSubscription - this.inflightForSubscription.get())), 1L);
                    InflightLimiter.this.inflight.addAndGet((int)maxRequest);
                    this.requestedDemand.addAndGet(-maxRequest);
                    this.inflightForSubscription.addAndGet((int)maxRequest);
                    this.request(maxRequest);
                }
            } else {
                this.maybeAddToPending();
            }
        }

        void maybeAddToPending() {
            if (this.requestedDemand.get() > 0L && !this.isDisposed() && this.inflightForSubscription.get() == 0) {
                InflightLimiter.this.pendingSubscriptions.add((Object)this);
            }
        }
    }

    private static enum InflightLimiterSubscriberState {
        INITIAL,
        SUBSCRIBING,
        SUBSCRIBED,
        REQUESTING;

    }
}

