/*
 * Decompiled with CFR 0.152.
 */
package reactor.rx.action.metrics;

import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.rx.action.Action;
import reactor.rx.subscription.PushSubscription;

public class CountAction<T>
extends Action<T, Long> {
    private final AtomicLong counter = new AtomicLong(0L);
    private final Long i;

    public CountAction(long i) {
        this.i = i;
    }

    @Override
    public void subscribe(final Subscriber<? super Long> subscriber) {
        PushSubscription sub = this.upstreamSubscription;
        if (sub != null && sub.isComplete()) {
            subscriber.onSubscribe(new Subscription(){

                @Override
                public void request(long n) {
                    subscriber.onNext(CountAction.this.counter.get());
                    subscriber.onComplete();
                }

                @Override
                public void cancel() {
                }
            });
        } else {
            super.subscribe(subscriber);
        }
    }

    @Override
    protected void doNext(T value) {
        long counter = this.counter.incrementAndGet();
        if (this.i != null && counter % this.i == 0L) {
            this.broadcastNext(counter);
        }
    }

    @Override
    protected void doComplete() {
        this.broadcastNext(this.counter.get());
        super.doComplete();
    }
}

