/*
 * Decompiled with CFR 0.152.
 */
package reactor.rx.action.terminal;

import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Dispatcher;
import reactor.core.dispatch.SynchronousDispatcher;
import reactor.core.dispatch.TailRecurseDispatcher;
import reactor.fn.Consumer;
import reactor.rx.Stream;
import reactor.rx.action.Action;
import reactor.rx.subscription.PushSubscription;

public final class ConsumerAction<T>
extends Action<T, Void> {
    private final Consumer<? super T> consumer;
    private final Consumer<? super Throwable> errorConsumer;
    private final Consumer<Void> completeConsumer;
    private final Dispatcher dispatcher;
    private final AtomicLongFieldUpdater<ConsumerAction> COUNTED = AtomicLongFieldUpdater.newUpdater(ConsumerAction.class, "count");
    private volatile long count;
    private long pendingRequests;

    public ConsumerAction(Dispatcher dispatcher, Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Consumer<Void> completeConsumer) {
        this.consumer = consumer;
        this.dispatcher = dispatcher == SynchronousDispatcher.INSTANCE ? TailRecurseDispatcher.INSTANCE : dispatcher;
        this.errorConsumer = errorConsumer;
        this.completeConsumer = completeConsumer;
        this.capacity = Long.MAX_VALUE;
        if (consumer != null) {
            this.pendingRequests = this.capacity;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void requestMore(long n) {
        PushSubscription upstreamSubscription = this.upstreamSubscription;
        if (upstreamSubscription != null) {
            long toRequest = Math.min(n, this.capacity);
            if (this.COUNTED.addAndGet(this, toRequest) < 0L) {
                this.COUNTED.set(this, Long.MAX_VALUE);
            }
            this.dispatcher.dispatch((Object)toRequest, (Consumer)upstreamSubscription, null);
        } else {
            ConsumerAction consumerAction = this;
            synchronized (consumerAction) {
                if ((this.pendingRequests += n) < 0L) {
                    this.pendingRequests = Long.MAX_VALUE;
                }
            }
        }
    }

    @Override
    protected void doNext(T ev) {
        if (this.consumer != null) {
            this.consumer.accept(ev);
        }
        if (this.upstreamSubscription != null && this.capacity != Long.MAX_VALUE && this.COUNTED.decrementAndGet(this) == 0L) {
            this.requestMore(this.capacity);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void doOnSubscribe(Subscription subscription) {
        long toRequest;
        ConsumerAction consumerAction = this;
        synchronized (consumerAction) {
            toRequest = this.pendingRequests;
            this.pendingRequests = 0L;
        }
        if (toRequest > 0L) {
            this.requestMore(toRequest);
        }
    }

    @Override
    protected void doError(Throwable ev) {
        this.cancel();
        if (this.errorConsumer != null) {
            this.errorConsumer.accept((Object)ev);
        }
        super.doError(ev);
    }

    @Override
    protected void doComplete() {
        this.cancel();
        if (this.completeConsumer != null) {
            this.completeConsumer.accept(null);
        }
        super.doComplete();
    }

    @Override
    protected PushSubscription<Void> createSubscription(Subscriber<? super Void> subscriber, boolean reactivePull) {
        return new PushSubscription<Void>((Stream)this, subscriber){

            @Override
            public void request(long n) {
            }
        };
    }

    @Override
    public boolean isReactivePull(Dispatcher dispatcher, long producerCapacity) {
        return this.capacity != Long.MAX_VALUE;
    }

    @Override
    public Dispatcher getDispatcher() {
        return this.dispatcher;
    }

    @Override
    public String toString() {
        return super.toString() + "{pending=" + this.pendingRequests + "}";
    }
}

