/*
 * Decompiled with CFR 0.152.
 */
package reactor.rx.action.control;

import java.util.concurrent.TimeUnit;
import reactor.core.Dispatcher;
import reactor.core.processor.InsufficientCapacityException;
import reactor.core.support.Assert;
import reactor.fn.Consumer;
import reactor.fn.Pausable;
import reactor.fn.timer.Timer;
import reactor.rx.action.Action;

public class ThrottleRequestAction<T>
extends Action<T, T> {
    private final Timer timer;
    private final long period;
    private final Consumer<Long> periodTask;
    private long pending;
    private Pausable timeoutRegistration;

    public ThrottleRequestAction(Dispatcher dispatcher, Timer timer, long period) {
        super(1L);
        Assert.state(timer != null, "Timer must be supplied");
        this.periodTask = new Consumer<Long>(){

            @Override
            public void accept(Long aLong) {
                if (ThrottleRequestAction.this.upstreamSubscription != null) {
                    try {
                        ThrottleRequestAction.this.upstreamSubscription.request(1L);
                    }
                    catch (InsufficientCapacityException insufficientCapacityException) {
                        // empty catch block
                    }
                }
            }
        };
        this.timer = timer;
        this.period = period;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void doNext(T ev) {
        this.broadcastNext(ev);
        ThrottleRequestAction throttleRequestAction = this;
        synchronized (throttleRequestAction) {
            if (this.pending != Long.MAX_VALUE) {
                --this.pending;
            }
        }
        if (this.pending > 0L) {
            this.timeoutRegistration = this.timer.submit(this.periodTask, this.period, TimeUnit.MILLISECONDS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void requestMore(long n) {
        ThrottleRequestAction throttleRequestAction = this;
        synchronized (throttleRequestAction) {
            if (this.pending != Long.MAX_VALUE) {
                this.pending += n;
                this.pending = this.pending < 0L ? Long.MAX_VALUE : this.pending;
            }
        }
        if (this.timeoutRegistration == null) {
            this.timeoutRegistration = this.timer.submit(this.periodTask, this.period, TimeUnit.MILLISECONDS);
        }
    }

    @Override
    public boolean isReactivePull(Dispatcher dispatcher, long producerCapacity) {
        return true;
    }

    @Override
    protected void doShutdown() {
        if (this.timeoutRegistration != null) {
            this.timeoutRegistration.cancel();
        }
        super.doShutdown();
    }
}

