/*
 * Decompiled with CFR 0.152.
 */
package reactor.rx.action.aggregation;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.fn.Consumer;
import reactor.fn.Pausable;
import reactor.fn.timer.Timer;
import reactor.rx.Stream;
import reactor.rx.action.Action;
import reactor.rx.broadcast.Broadcaster;
import reactor.rx.subscription.ReactiveSubscription;

public class WindowShiftAction<T>
extends Action<T, Stream<T>> {
    private final Consumer<Long> timeshiftTask;
    private final List<ReactiveSubscription<T>> currentWindows = new LinkedList<ReactiveSubscription<T>>();
    private final int skip;
    private final int batchSize;
    private final long timeshift;
    private final TimeUnit unit;
    private final Timer timer;
    private final Environment environment;
    private final Dispatcher dispatcher;
    private int index;
    private Pausable timeshiftRegistration;

    public WindowShiftAction(Environment environment, Dispatcher dispatcher, int size, int skip) {
        this(environment, dispatcher, size, skip, -1L, -1L, null, null);
    }

    public WindowShiftAction(Environment environment, final Dispatcher dispatcher, int size, int skip, final long timespan, long timeshift, TimeUnit unit, final Timer timer) {
        this.dispatcher = dispatcher;
        this.skip = skip;
        this.environment = environment;
        this.batchSize = size;
        if (timespan > 0L && timeshift > 0L) {
            final TimeUnit targetUnit = unit != null ? unit : TimeUnit.SECONDS;
            final Consumer flushTimerTask = new Consumer<ReactiveSubscription<T>>(){

                public void accept(ReactiveSubscription<T> bucket) {
                    Iterator it = WindowShiftAction.this.currentWindows.iterator();
                    while (it.hasNext()) {
                        ReactiveSubscription itBucket = (ReactiveSubscription)it.next();
                        if (bucket != itBucket) continue;
                        it.remove();
                        bucket.onComplete();
                        break;
                    }
                }
            };
            this.timeshiftTask = new Consumer<Long>(){

                public void accept(Long aLong) {
                    WindowShiftAction.this.timeshiftRegistration = null;
                    if (!WindowShiftAction.this.isPublishing()) {
                        return;
                    }
                    dispatcher.dispatch(null, (Consumer)new Consumer<Void>(){

                        public void accept(Void aVoid) {
                            final ReactiveSubscription bucket = WindowShiftAction.this.createWindowStream();
                            timer.submit((Consumer)new Consumer<Long>(){

                                public void accept(Long aLong) {
                                    dispatcher.dispatch((Object)bucket, flushTimerTask, null);
                                }
                            }, timespan, targetUnit);
                        }
                    }, null);
                }
            };
            this.timeshift = timeshift;
            this.unit = targetUnit;
            this.timer = timer;
        } else {
            this.timeshift = -1L;
            this.unit = null;
            this.timer = null;
            this.timeshiftTask = null;
        }
    }

    @Override
    protected void doNext(T value) {
        if (this.timer != null) {
            if (this.timeshiftRegistration == null) {
                this.timeshiftRegistration = this.timer.submit(this.timeshiftTask, this.timeshift, this.unit);
            }
        } else if (this.index++ % this.skip == 0) {
            this.createWindowStream();
        }
        this.flushCallback(value);
    }

    @Override
    protected void doComplete() {
        for (ReactiveSubscription<T> bucket : this.currentWindows) {
            bucket.onComplete();
        }
        this.currentWindows.clear();
        super.doComplete();
    }

    private void flushCallback(T event) {
        Iterator<ReactiveSubscription<T>> it = this.currentWindows.iterator();
        while (it.hasNext()) {
            ReactiveSubscription<T> bucket = it.next();
            bucket.onNext(event);
            if (bucket.currentNextSignals() != (long)this.batchSize) continue;
            it.remove();
            bucket.onComplete();
        }
    }

    protected ReactiveSubscription<T> createWindowStream() {
        Broadcaster action = Broadcaster.create(this.environment, this.dispatcher);
        ReactiveSubscription _currentWindow = new ReactiveSubscription(null, action);
        this.currentWindows.add(_currentWindow);
        ((Action)action).onSubscribe(_currentWindow);
        this.broadcastNext(action);
        return _currentWindow;
    }

    @Override
    public final Environment getEnvironment() {
        return this.environment;
    }

    @Override
    public final Dispatcher getDispatcher() {
        return this.dispatcher;
    }

    @Override
    protected void doError(Throwable ev) {
        super.doError(ev);
        for (ReactiveSubscription<T> bucket : this.currentWindows) {
            bucket.onError(ev);
        }
        this.currentWindows.clear();
    }
}

