/*
 * Decompiled with CFR 0.152.
 */
package reactor.rx.action.terminal;

import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Dispatcher;
import reactor.core.dispatch.SynchronousDispatcher;
import reactor.core.dispatch.TailRecurseDispatcher;
import reactor.core.support.NonBlocking;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.rx.Stream;
import reactor.rx.action.Action;
import reactor.rx.broadcast.Broadcaster;
import reactor.rx.subscription.PushSubscription;

public final class AdaptiveConsumerAction<T>
extends Action<T, Void> {
    private final Consumer<? super T> consumer;
    private final Dispatcher dispatcher;
    private final Broadcaster<Long> requestMapperStream;
    private final AtomicLongFieldUpdater<AdaptiveConsumerAction> COUNTED = AtomicLongFieldUpdater.newUpdater(AdaptiveConsumerAction.class, "counted");
    private final AtomicLongFieldUpdater<AdaptiveConsumerAction> COUNTING = AtomicLongFieldUpdater.newUpdater(AdaptiveConsumerAction.class, "counting");
    private volatile long counted;
    private volatile long counting;
    private long pendingRequests;

    public AdaptiveConsumerAction(Dispatcher dispatcher, long initCapacity, Consumer<? super T> consumer, Function<Stream<Long>, ? extends Publisher<? extends Long>> requestMapper) {
        this.consumer = consumer;
        this.requestMapperStream = Broadcaster.create();
        this.requestMapperStream.onSubscribe(new Subscription(){

            public void request(long n) {
            }

            public void cancel() {
                PushSubscription subscription = AdaptiveConsumerAction.this.upstreamSubscription;
                if (subscription != null) {
                    AdaptiveConsumerAction.this.upstreamSubscription = null;
                    subscription.cancel();
                }
            }
        });
        this.dispatcher = SynchronousDispatcher.INSTANCE == dispatcher ? TailRecurseDispatcher.INSTANCE : dispatcher;
        this.capacity = initCapacity;
        Publisher afterRequestStream = (Publisher)requestMapper.apply(this.requestMapperStream);
        afterRequestStream.subscribe((Subscriber)new RequestSubscriber());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void requestMore(long n) {
        if (this.upstreamSubscription != null) {
            this.requestMapperStream.onNext(n);
        } else {
            AdaptiveConsumerAction adaptiveConsumerAction = this;
            synchronized (adaptiveConsumerAction) {
                if ((this.pendingRequests += n) < 0L) {
                    this.pendingRequests = Long.MAX_VALUE;
                }
            }
        }
    }

    @Override
    protected void doNext(T ev) {
        if (this.consumer != null) {
            this.consumer.accept(ev);
        }
        this.COUNTING.incrementAndGet(this);
        if (this.upstreamSubscription != null && this.capacity != Long.MAX_VALUE && this.COUNTED.decrementAndGet(this) == 0L) {
            this.requestMore(this.COUNTING.getAndSet(this, 0L));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void doOnSubscribe(Subscription subscription) {
        long toRequest;
        AdaptiveConsumerAction adaptiveConsumerAction = this;
        synchronized (adaptiveConsumerAction) {
            toRequest = this.pendingRequests;
            this.pendingRequests = 0L;
        }
        if (toRequest > 0L) {
            this.requestMore(toRequest);
        }
    }

    @Override
    protected void doError(Throwable ev) {
        this.cancel();
        this.requestMapperStream.onError(ev);
        super.doError(ev);
    }

    @Override
    protected void doShutdown() {
        this.cancel();
        this.requestMapperStream.onComplete();
        super.doShutdown();
    }

    @Override
    protected PushSubscription<Void> createSubscription(Subscriber<? super Void> subscriber, boolean reactivePull) {
        return new PushSubscription<Void>((Stream)this, subscriber){

            @Override
            public void request(long n) {
            }
        };
    }

    @Override
    public boolean isReactivePull(Dispatcher dispatcher, long producerCapacity) {
        return this.capacity != Long.MAX_VALUE;
    }

    @Override
    public Dispatcher getDispatcher() {
        return this.dispatcher;
    }

    @Override
    public String toString() {
        return super.toString() + "{pending=" + this.pendingRequests + "}";
    }

    private class RequestSubscriber
    implements Subscriber<Long>,
    NonBlocking {
        Subscription s;

        private RequestSubscriber() {
        }

        public void onSubscribe(Subscription s) {
            this.s = s;
            s.request(1L);
        }

        public void onNext(Long n) {
            PushSubscription upstreamSubscription;
            if (AdaptiveConsumerAction.this.COUNTED.addAndGet(AdaptiveConsumerAction.this, n) < 0L) {
                AdaptiveConsumerAction.this.COUNTED.set(AdaptiveConsumerAction.this, Long.MAX_VALUE);
            }
            if ((upstreamSubscription = AdaptiveConsumerAction.this.upstreamSubscription) != null) {
                AdaptiveConsumerAction.this.dispatcher.dispatch((Object)n, (Consumer)upstreamSubscription, null);
            }
            if (this.s != null) {
                this.s.request(1L);
            }
        }

        public void onError(Throwable t) {
            if (this.s != null) {
                this.s.cancel();
            }
        }

        public void onComplete() {
            if (this.s != null) {
                this.s.cancel();
            }
        }

        public boolean isReactivePull(Dispatcher dispatcher, long producerCapacity) {
            return AdaptiveConsumerAction.this.isReactivePull(dispatcher, producerCapacity);
        }

        public long getCapacity() {
            return AdaptiveConsumerAction.this.capacity;
        }
    }
}

