/*
 * Decompiled with CFR 0.152.
 */
package reactor.rx.action.aggregation;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.fn.Supplier;
import reactor.rx.Stream;
import reactor.rx.action.Action;
import reactor.rx.broadcast.BehaviorBroadcaster;
import reactor.rx.broadcast.Broadcaster;

public class WindowWhenAction<T>
extends Action<T, Stream<T>> {
    private final Supplier<? extends Publisher<?>> boundarySupplier;
    private final Environment environment;
    private final Dispatcher dispatcher;
    private Broadcaster<T> windowBroadcaster;

    public WindowWhenAction(Environment environment, Dispatcher dispatcher, Supplier<? extends Publisher<?>> boundarySupplier) {
        this.boundarySupplier = boundarySupplier;
        this.environment = environment;
        this.dispatcher = dispatcher;
    }

    @Override
    protected void doOnSubscribe(Subscription subscription) {
        super.doOnSubscribe(subscription);
        ((Publisher)this.boundarySupplier.get()).subscribe((Subscriber)new Subscriber<Object>(){
            Subscription s;

            public void onSubscribe(Subscription s) {
                this.s = s;
                s.request(1L);
            }

            public void onNext(Object o) {
                WindowWhenAction.this.flush();
                if (this.s != null) {
                    this.s.request(1L);
                }
            }

            public void onError(Throwable t) {
                if (this.s != null) {
                    this.s.cancel();
                }
                WindowWhenAction.this.onError(t);
            }

            public void onComplete() {
                if (this.s != null) {
                    this.s.cancel();
                }
                WindowWhenAction.this.onComplete();
            }
        });
    }

    private void flush() {
        Broadcaster<T> _currentWindow = this.windowBroadcaster;
        if (_currentWindow != null) {
            this.windowBroadcaster = null;
            _currentWindow.onComplete();
        }
    }

    @Override
    protected void doNext(T value) {
        if (this.windowBroadcaster == null) {
            this.broadcastNext(this.createWindowStream(value));
        } else {
            this.windowBroadcaster.onNext(value);
        }
    }

    public Broadcaster<T> currentWindow() {
        return this.windowBroadcaster;
    }

    protected Stream<T> createWindowStream(T first) {
        Broadcaster<T> action = BehaviorBroadcaster.first(first, this.environment, this.dispatcher);
        this.windowBroadcaster = action;
        return action;
    }

    @Override
    protected void doError(Throwable ev) {
        if (this.windowBroadcaster != null) {
            this.windowBroadcaster.onError(ev);
            this.windowBroadcaster = null;
        }
        super.doError(ev);
    }

    @Override
    protected void doComplete() {
        if (this.windowBroadcaster != null) {
            this.windowBroadcaster.onComplete();
            this.windowBroadcaster = null;
        }
        super.doComplete();
    }

    @Override
    public final Environment getEnvironment() {
        return this.environment;
    }

    @Override
    public Dispatcher getDispatcher() {
        return this.dispatcher;
    }
}

