/*
 * Decompiled with CFR 0.152.
 */
package reactor.rx;

import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.dispatch.SynchronousDispatcher;
import reactor.core.dispatch.TailRecurseDispatcher;
import reactor.core.queue.CompletableBlockingQueue;
import reactor.core.queue.CompletableLinkedQueue;
import reactor.core.queue.CompletableQueue;
import reactor.core.support.Assert;
import reactor.core.support.Exceptions;
import reactor.core.support.NonBlocking;
import reactor.fn.BiConsumer;
import reactor.fn.BiFunction;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.fn.Predicate;
import reactor.fn.Supplier;
import reactor.fn.support.Tap;
import reactor.fn.timer.Timer;
import reactor.fn.tuple.Tuple2;
import reactor.fn.tuple.TupleN;
import reactor.io.codec.Codec;
import reactor.rx.Promise;
import reactor.rx.Streams;
import reactor.rx.action.Action;
import reactor.rx.action.CompositeAction;
import reactor.rx.action.Control;
import reactor.rx.action.Signal;
import reactor.rx.action.aggregation.BufferAction;
import reactor.rx.action.aggregation.BufferShiftAction;
import reactor.rx.action.aggregation.BufferShiftWhenAction;
import reactor.rx.action.aggregation.BufferWhenAction;
import reactor.rx.action.aggregation.CacheAction;
import reactor.rx.action.aggregation.LastAction;
import reactor.rx.action.aggregation.SampleAction;
import reactor.rx.action.aggregation.SortAction;
import reactor.rx.action.aggregation.WindowAction;
import reactor.rx.action.aggregation.WindowShiftAction;
import reactor.rx.action.aggregation.WindowShiftWhenAction;
import reactor.rx.action.aggregation.WindowWhenAction;
import reactor.rx.action.combination.ConcatAction;
import reactor.rx.action.combination.DynamicMergeAction;
import reactor.rx.action.combination.FanInAction;
import reactor.rx.action.combination.MergeAction;
import reactor.rx.action.combination.SwitchAction;
import reactor.rx.action.combination.ZipAction;
import reactor.rx.action.conditional.ExistsAction;
import reactor.rx.action.control.DispatcherAction;
import reactor.rx.action.control.FlowControlAction;
import reactor.rx.action.control.RepeatAction;
import reactor.rx.action.control.RepeatWhenAction;
import reactor.rx.action.control.ThrottleRequestAction;
import reactor.rx.action.control.ThrottleRequestWhenAction;
import reactor.rx.action.error.ErrorAction;
import reactor.rx.action.error.ErrorReturnAction;
import reactor.rx.action.error.ErrorWithValueAction;
import reactor.rx.action.error.IgnoreErrorAction;
import reactor.rx.action.error.RetryAction;
import reactor.rx.action.error.RetryWhenAction;
import reactor.rx.action.error.TimeoutAction;
import reactor.rx.action.filter.DistinctAction;
import reactor.rx.action.filter.DistinctUntilChangedAction;
import reactor.rx.action.filter.ElementAtAction;
import reactor.rx.action.filter.FilterAction;
import reactor.rx.action.filter.SkipAction;
import reactor.rx.action.filter.SkipUntilTimeout;
import reactor.rx.action.filter.TakeAction;
import reactor.rx.action.filter.TakeUntilTimeout;
import reactor.rx.action.filter.TakeWhileAction;
import reactor.rx.action.metrics.CountAction;
import reactor.rx.action.metrics.ElapsedAction;
import reactor.rx.action.metrics.TimestampAction;
import reactor.rx.action.passive.AfterAction;
import reactor.rx.action.passive.CallbackAction;
import reactor.rx.action.passive.FinallyAction;
import reactor.rx.action.passive.LoggerAction;
import reactor.rx.action.passive.StreamStateCallbackAction;
import reactor.rx.action.support.TapAndControls;
import reactor.rx.action.terminal.AdaptiveConsumerAction;
import reactor.rx.action.terminal.ConsumerAction;
import reactor.rx.action.transformation.DefaultIfEmptyAction;
import reactor.rx.action.transformation.DematerializeAction;
import reactor.rx.action.transformation.GroupByAction;
import reactor.rx.action.transformation.MapAction;
import reactor.rx.action.transformation.MaterializeAction;
import reactor.rx.action.transformation.ScanAction;
import reactor.rx.action.transformation.SplitAction;
import reactor.rx.broadcast.Broadcaster;
import reactor.rx.stream.GroupedStream;
import reactor.rx.stream.LiftStream;
import reactor.rx.subscription.PushSubscription;

public abstract class Stream<O>
implements Publisher<O>,
NonBlocking {
    private static final Consumer NOOP = new Consumer(){

        public void accept(Object o) {
        }
    };
    private static final SynchronousDispatcher PROCESSOR_SYNC = new SynchronousDispatcher();

    protected Stream() {
    }

    public final <E> Stream<E> cast(@Nonnull Class<E> stream) {
        return this;
    }

    public <V> Stream<V> lift(@Nonnull Supplier<? extends Action<O, V>> action) {
        return new LiftStream(this, action);
    }

    public final <E extends Throwable> Stream<O> when(final @Nonnull Class<E> exceptionType, final @Nonnull Consumer<E> onError) {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new ErrorAction(exceptionType, onError, null);
            }
        });
    }

    public final <E extends Throwable> Stream<O> observeError(final @Nonnull Class<E> exceptionType, final @Nonnull BiConsumer<Object, ? super E> onError) {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new ErrorWithValueAction(exceptionType, onError, null);
            }
        });
    }

    public final Stream<O> onErrorResumeNext(@Nonnull Publisher<? extends O> fallback) {
        return this.onErrorResumeNext(Throwable.class, fallback);
    }

    public final <E extends Throwable> Stream<O> onErrorResumeNext(final @Nonnull Class<E> exceptionType, final @Nonnull Publisher<? extends O> fallback) {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new ErrorAction(exceptionType, null, fallback);
            }
        });
    }

    public final Stream<O> onErrorReturn(@Nonnull Function<Throwable, ? extends O> fallback) {
        return this.onErrorReturn(Throwable.class, fallback);
    }

    public final <E extends Throwable> Stream<O> onErrorReturn(final @Nonnull Class<E> exceptionType, final @Nonnull Function<E, ? extends O> fallback) {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new ErrorReturnAction(exceptionType, fallback);
            }
        });
    }

    public final Stream<Void> after() {
        return this.lift(new Supplier<Action<O, Void>>(){

            @Override
            public Action<O, Void> get() {
                return new AfterAction();
            }
        });
    }

    public final Stream<Signal<O>> materialize() {
        return this.lift(new Supplier<Action<O, Signal<O>>>(){

            @Override
            public Action<O, Signal<O>> get() {
                return new MaterializeAction();
            }
        });
    }

    public final <X> Stream<X> dematerialize() {
        Stream thiz = this;
        return thiz.lift(new Supplier<Action<Signal<X>, X>>(){

            @Override
            public Action<Signal<X>, X> get() {
                return new DematerializeAction();
            }
        });
    }

    public final Stream<O> broadcast() {
        return this.broadcastOn(this.getDispatcher());
    }

    public final Stream<O> broadcastOn(Dispatcher dispatcher) {
        Broadcaster broadcaster = Broadcaster.create(this.getEnvironment(), dispatcher);
        return this.broadcastTo(broadcaster);
    }

    public final <E extends Subscriber<? super O>> E broadcastTo(E subscriber) {
        this.subscribe(subscriber);
        return subscriber;
    }

    public final TapAndControls<O> tap() {
        Tap tap = new Tap();
        return new TapAndControls(tap, this.consume(tap));
    }

    public final <E> Stream<E> process(final Processor<O, E> processor) {
        this.subscribe(processor);
        if (Stream.class.isAssignableFrom(processor.getClass())) {
            return (Stream)((Object)processor);
        }
        final long capacity = this.getCapacity();
        return new Stream<E>(){

            @Override
            public Dispatcher getDispatcher() {
                return PROCESSOR_SYNC;
            }

            @Override
            public long getCapacity() {
                return capacity;
            }

            @Override
            public Environment getEnvironment() {
                return Stream.this.getEnvironment();
            }

            @Override
            public void subscribe(Subscriber<? super E> s) {
                try {
                    processor.subscribe(s);
                }
                catch (Throwable t) {
                    s.onError(t);
                }
            }
        };
    }

    public Control consumeLater() {
        return this.consume(null);
    }

    public Control consume() {
        return this.consume(NOOP);
    }

    public Control consume(long n) {
        Control controls = this.consume(null);
        if (n > 0L) {
            controls.requestMore(n);
        }
        return controls;
    }

    public final Control consume(Consumer<? super O> consumer) {
        return this.consumeOn(this.getDispatcher(), consumer);
    }

    public final Control consumeOn(Dispatcher dispatcher, Consumer<? super O> consumer) {
        ConsumerAction<? super O> consumerAction = new ConsumerAction<O>(SynchronousDispatcher.INSTANCE != dispatcher && PROCESSOR_SYNC != dispatcher && dispatcher == this.getDispatcher() ? Long.MAX_VALUE : this.getCapacity(), dispatcher, consumer, null, null);
        this.subscribe(consumerAction);
        return consumerAction;
    }

    public final Control consume(Consumer<? super O> consumer, Consumer<? super Throwable> errorConsumer) {
        return this.consumeOn(this.getDispatcher(), consumer, errorConsumer);
    }

    public final Control consumeOn(Dispatcher dispatcher, Consumer<? super O> consumer, Consumer<? super Throwable> errorConsumer) {
        return this.consumeOn(dispatcher, consumer, errorConsumer, null);
    }

    public final Control consume(Consumer<? super O> consumer, Consumer<? super Throwable> errorConsumer, Consumer<Void> completeConsumer) {
        return this.consumeOn(this.getDispatcher(), consumer, errorConsumer, completeConsumer);
    }

    public final Control consumeOn(Dispatcher dispatcher, Consumer<? super O> consumer, Consumer<? super Throwable> errorConsumer, Consumer<Void> completeConsumer) {
        ConsumerAction<? super O> consumerAction = new ConsumerAction<O>(this.getCapacity(), dispatcher, consumer, errorConsumer, completeConsumer);
        this.subscribe(consumerAction);
        return consumerAction;
    }

    public final Control batchConsume(Consumer<? super O> consumer, Function<Long, ? extends Long> requestMapper) {
        return this.batchConsumeOn(this.getDispatcher(), consumer, requestMapper);
    }

    public final Control adaptiveConsume(Consumer<? super O> consumer, Function<Stream<Long>, ? extends Publisher<? extends Long>> requestMapper) {
        return this.adaptiveConsumeOn(this.getDispatcher(), consumer, requestMapper);
    }

    public final Control batchConsumeOn(Dispatcher dispatcher, Consumer<? super O> consumer, final Function<Long, ? extends Long> requestMapper) {
        return this.adaptiveConsumeOn(dispatcher, consumer, (Function<Stream<Long>, Publisher<Long>>)new Function<Stream<Long>, Publisher<? extends Long>>(){

            @Override
            public Publisher<? extends Long> apply(Stream<Long> longStream) {
                return longStream.map(requestMapper);
            }
        });
    }

    public final Control adaptiveConsumeOn(Dispatcher dispatcher, Consumer<? super O> consumer, Function<Stream<Long>, ? extends Publisher<? extends Long>> requestMapper) {
        AdaptiveConsumerAction<O> consumerAction = new AdaptiveConsumerAction<O>(dispatcher, this.getCapacity(), consumer, requestMapper);
        this.subscribe(consumerAction);
        if (consumer != null) {
            consumerAction.requestMore(consumerAction.getCapacity());
        }
        return consumerAction;
    }

    public final Stream<O> dispatchOn(@Nonnull Environment environment) {
        return this.dispatchOn(environment, environment.getDefaultDispatcher());
    }

    public final Stream<O> subscribeOn(@Nonnull Environment environment) {
        return this.subscribeOn(environment.getDefaultDispatcher());
    }

    public final Stream<O> dispatchOn(@Nonnull Dispatcher dispatcher) {
        return this.dispatchOn(null, dispatcher);
    }

    public final void subscribeOn(@Nonnull Dispatcher currentDispatcher, Subscriber<? super O> sub) {
        this.subscribeOn(currentDispatcher).subscribe(sub);
    }

    public final Stream<O> subscribeOn(@Nonnull Dispatcher currentDispatcher) {
        return new StreamDispatchedSubscribe(this, currentDispatcher);
    }

    public Stream<O> dispatchOn(final Environment environment, final @Nonnull Dispatcher dispatcher) {
        if (dispatcher == SynchronousDispatcher.INSTANCE) {
            if (environment != null && environment != this.getEnvironment()) {
                return this.env(environment);
            }
            return this;
        }
        Assert.state(dispatcher.supportsOrdering(), "Dispatcher provided doesn't support event ordering.  For concurrent signal dispatching, refer to #partition()/groupBy() method and assign individual single dispatchers. ");
        long _capacity = Action.evaluateCapacity(dispatcher.backlogSize());
        long parentCapacity = this.getCapacity();
        final Dispatcher parentDispatcher = this.getDispatcher();
        final long capacity = _capacity > parentCapacity ? parentCapacity : _capacity;
        return new LiftStream<O, O>(this, new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new DispatcherAction(dispatcher, parentDispatcher).capacity(capacity);
            }
        }){

            @Override
            public Dispatcher getDispatcher() {
                return dispatcher;
            }

            @Override
            public Environment getEnvironment() {
                return environment;
            }

            @Override
            public long getCapacity() {
                return capacity;
            }
        };
    }

    public final Stream<O> observe(final @Nonnull Consumer<? super O> consumer) {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new CallbackAction(consumer, null);
            }
        });
    }

    public final Stream<O> cache() {
        CacheAction cacheAction = new CacheAction();
        this.subscribe(cacheAction);
        return cacheAction;
    }

    public final Stream<O> log() {
        return this.log(null);
    }

    public final Stream<O> log(final String name) {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new LoggerAction(name);
            }
        });
    }

    public final Stream<O> observeComplete(final @Nonnull Consumer<Void> consumer) {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new CallbackAction(null, consumer);
            }
        });
    }

    public final Stream<O> observeSubscribe(final @Nonnull Consumer<? super Subscriber<? super O>> consumer) {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new StreamStateCallbackAction(consumer, null, null);
            }
        });
    }

    public final Stream<O> observeStart(final @Nonnull Consumer<? super Subscription> consumer) {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new StreamStateCallbackAction(null, null, consumer);
            }
        });
    }

    public final Stream<O> observeCancel(final @Nonnull Consumer<Void> consumer) {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new StreamStateCallbackAction(null, consumer, null);
            }
        });
    }

    public Stream<O> ignoreError() {
        return this.ignoreError((Predicate<Throwable>)new Predicate<Throwable>(){

            @Override
            public boolean test(Throwable o) {
                return true;
            }
        });
    }

    public <E> Stream<O> ignoreError(final Predicate<? super Throwable> ignorePredicate) {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new IgnoreErrorAction(ignorePredicate);
            }
        });
    }

    public final Stream<O> finallyDo(final Consumer<Signal<O>> consumer) {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new FinallyAction(consumer);
            }
        });
    }

    public final Stream<O> defaultIfEmpty(final O defaultValue) {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new DefaultIfEmptyAction<Object>(defaultValue);
            }
        });
    }

    public final <V> Stream<V> map(final @Nonnull Function<? super O, ? extends V> fn) {
        return this.lift(new Supplier<Action<O, V>>(){

            @Override
            public Action<O, V> get() {
                return new MapAction(fn);
            }
        });
    }

    public final <V> Stream<V> flatMap(@Nonnull Function<? super O, ? extends Publisher<? extends V>> fn) {
        return this.map(fn).merge();
    }

    public final <V> Stream<V> switchMap(@Nonnull Function<? super O, Publisher<? extends V>> fn) {
        return this.map(fn).lift(new Supplier<Action<Publisher<? extends V>, V>>(){

            @Override
            public Action<Publisher<? extends V>, V> get() {
                return new SwitchAction(Stream.this.getDispatcher());
            }
        });
    }

    public final <V> Stream<V> concatMap(@Nonnull Function<? super O, Publisher<? extends V>> fn) {
        return this.map(fn).lift(new Supplier<Action<Publisher<? extends V>, V>>(){

            @Override
            public Action<Publisher<? extends V>, V> get() {
                return new ConcatAction();
            }
        });
    }

    public final <V> Stream<V> decode(final Codec<O, V, ?> codec) {
        return new Stream<V>(){

            @Override
            public void subscribe(Subscriber<? super V> s) {
                codec.decode(Stream.this).subscribe(s);
            }

            @Override
            public long getCapacity() {
                return Stream.this.getCapacity();
            }

            @Override
            public Dispatcher getDispatcher() {
                return Stream.this.getDispatcher();
            }

            @Override
            public Environment getEnvironment() {
                return Stream.this.getEnvironment();
            }
        };
    }

    public final <V> Stream<V> encode(final Codec<V, ?, O> codec) {
        return new Stream<V>(){

            @Override
            public void subscribe(Subscriber<? super V> s) {
                codec.encode(Stream.this).subscribe(s);
            }

            @Override
            public long getCapacity() {
                return Stream.this.getCapacity();
            }

            @Override
            public Dispatcher getDispatcher() {
                return Stream.this.getDispatcher();
            }

            @Override
            public Environment getEnvironment() {
                return Stream.this.getEnvironment();
            }
        };
    }

    public final <V> Stream<V> merge() {
        return this.fanIn(null);
    }

    public final Stream<O> mergeWith(final Publisher<? extends O> publisher) {
        return new Stream<O>(){

            @Override
            public void subscribe(Subscriber<? super O> s) {
                new MergeAction((Dispatcher)SynchronousDispatcher.INSTANCE, Arrays.asList(Stream.this, publisher)).subscribe(s);
            }

            @Override
            public Environment getEnvironment() {
                return Stream.this.getEnvironment();
            }

            @Override
            public long getCapacity() {
                return Stream.this.getCapacity();
            }

            @Override
            public Dispatcher getDispatcher() {
                return Stream.this.getDispatcher();
            }
        };
    }

    public final Stream<O> concatWith(final Publisher<? extends O> publisher) {
        return new Stream<O>(){

            @Override
            public void subscribe(Subscriber<? super O> s) {
                Stream<Publisher> just = Streams.just(Stream.this, publisher);
                ConcatAction concatAction = new ConcatAction();
                concatAction.subscribe(s);
                just.subscribe(concatAction);
            }

            @Override
            public long getCapacity() {
                return Stream.this.getCapacity();
            }

            @Override
            public Dispatcher getDispatcher() {
                return Stream.this.getDispatcher();
            }

            @Override
            public Environment getEnvironment() {
                return Stream.this.getEnvironment();
            }
        };
    }

    public final Stream<O> startWith(Iterable<O> iterable) {
        return this.startWith((Publisher<? extends O>)Streams.from(iterable));
    }

    public final Stream<O> startWith(O value) {
        return this.startWith((Publisher<? extends O>)Streams.just(value));
    }

    public final Stream<O> startWith(Publisher<? extends O> publisher) {
        if (publisher == null) {
            return this;
        }
        return Streams.concat(publisher, this);
    }

    public final <V> Stream<List<V>> join() {
        return this.zip(ZipAction.joinZipper());
    }

    public final <V> Stream<List<V>> joinWith(Publisher<? extends V> publisher) {
        return this.zipWith(publisher, ZipAction.joinZipper());
    }

    public final <V> Stream<V> zip(final @Nonnull Function<TupleN, ? extends V> zipper) {
        Stream thiz = this;
        return thiz.lift(new Supplier<Action<Publisher<?>, V>>(){

            @Override
            public Action<Publisher<?>, V> get() {
                return new DynamicMergeAction(new ZipAction(SynchronousDispatcher.INSTANCE, zipper, null)).capacity(Stream.this.getCapacity());
            }
        });
    }

    public final <T2, V> Stream<V> zipWith(Iterable<? extends T2> iterable, @Nonnull Function<Tuple2<O, T2>, V> zipper) {
        return this.zipWith(Streams.from(iterable), zipper);
    }

    public final <T2, V> Stream<V> zipWith(final Publisher<? extends T2> publisher, final @Nonnull Function<Tuple2<O, T2>, V> zipper) {
        return new Stream<V>(){

            @Override
            public void subscribe(Subscriber<? super V> s) {
                new ZipAction(SynchronousDispatcher.INSTANCE, zipper, Arrays.asList(Stream.this, publisher)).subscribe(s);
            }

            @Override
            public long getCapacity() {
                return Stream.this.getCapacity();
            }

            @Override
            public Dispatcher getDispatcher() {
                return Stream.this.getDispatcher();
            }

            @Override
            public Environment getEnvironment() {
                return Stream.this.getEnvironment();
            }
        };
    }

    public <T, V> Stream<V> fanIn(final FanInAction<T, ?, V, ? extends FanInAction.InnerSubscriber<T, ?, V>> fanInAction) {
        Stream thiz = this;
        return thiz.lift(new Supplier<Action<Publisher<? extends T>, V>>(){

            @Override
            public Action<Publisher<? extends T>, V> get() {
                return new DynamicMergeAction(fanInAction).capacity(Stream.this.getCapacity());
            }
        });
    }

    public Stream<O> capacity(final long elements) {
        if (elements == this.getCapacity()) {
            return this;
        }
        return new Stream<O>(){

            @Override
            public void subscribe(Subscriber<? super O> s) {
                Stream.this.subscribe(s);
            }

            @Override
            public Dispatcher getDispatcher() {
                return Stream.this.getDispatcher();
            }

            @Override
            public Environment getEnvironment() {
                return Stream.this.getEnvironment();
            }

            @Override
            public long getCapacity() {
                return elements;
            }
        };
    }

    public final Stream<O> unbounded() {
        return this.capacity(Long.MAX_VALUE);
    }

    public final Stream<O> onOverflowBuffer() {
        return this.onOverflowBuffer(new Supplier<CompletableQueue<O>>(){

            @Override
            public CompletableQueue<O> get() {
                return new CompletableLinkedQueue();
            }
        });
    }

    public Stream<O> onOverflowBuffer(final Supplier<? extends CompletableQueue<O>> queueSupplier) {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new FlowControlAction(queueSupplier);
            }
        });
    }

    public final Stream<O> onOverflowDrop() {
        return this.onOverflowBuffer(null);
    }

    public final Stream<O> filter(final Predicate<? super O> p) {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new FilterAction(p);
            }
        });
    }

    public final Stream<Boolean> filter() {
        return this.filter(FilterAction.simplePredicate);
    }

    public final Stream<Stream<O>> nest() {
        return Streams.just(this);
    }

    public final Stream<O> retry() {
        return this.retry(-1);
    }

    public final Stream<O> retry(int numRetries) {
        return this.retry(numRetries, null);
    }

    public final Stream<O> retry(Predicate<Throwable> retryMatcher) {
        return this.retry(-1, retryMatcher);
    }

    public final Stream<O> retry(final int numRetries, final Predicate<Throwable> retryMatcher) {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new RetryAction(Stream.this.getDispatcher(), numRetries, retryMatcher, Stream.this);
            }
        });
    }

    public final Stream<O> recover(final @Nonnull Class<? extends Throwable> exceptionType, final Subscriber<Object> recoveredValuesSink) {
        return this.retryWhen(new Function<Stream<? extends Throwable>, Publisher<?>>(){

            @Override
            public Publisher<?> apply(Stream<? extends Throwable> stream) {
                stream.map(new Function<Throwable, Object>(){

                    @Override
                    public Object apply(Throwable throwable) {
                        if (exceptionType.isAssignableFrom(throwable.getClass())) {
                            return Exceptions.getFinalValueCause(throwable);
                        }
                        return null;
                    }
                }).subscribe(recoveredValuesSink);
                return stream.map(new Function<Throwable, Signal<Throwable>>(){

                    @Override
                    public Signal<Throwable> apply(Throwable throwable) {
                        if (exceptionType.isAssignableFrom(throwable.getClass())) {
                            return Signal.next(throwable);
                        }
                        return Signal.error(throwable);
                    }
                }).dematerialize();
            }
        });
    }

    public final Stream<O> retryWhen(final Function<? super Stream<? extends Throwable>, ? extends Publisher<?>> backOffStream) {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new RetryWhenAction(Stream.this.getDispatcher(), backOffStream, Stream.this);
            }
        });
    }

    public final Stream<O> repeat() {
        return this.repeat(-1);
    }

    public final Stream<O> repeat(final int numRepeat) {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new RepeatAction(Stream.this.getDispatcher(), numRepeat, Stream.this);
            }
        });
    }

    public final Stream<O> repeatWhen(final Function<? super Stream<? extends Long>, ? extends Publisher<?>> backOffStream) {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new RepeatWhenAction(Stream.this.getDispatcher(), backOffStream, Stream.this);
            }
        });
    }

    public final Stream<O> last() {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new LastAction();
            }
        });
    }

    public final Stream<O> take(final long max) {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new TakeAction(max);
            }
        });
    }

    public final Stream<O> take(long time, TimeUnit unit) {
        return this.take(time, unit, this.getTimer());
    }

    public final Stream<O> take(final long time, final TimeUnit unit, final Timer timer) {
        if (time > 0L) {
            Assert.isTrue(timer != null, "Timer can't be found, try assigning an environment to the stream");
            return this.lift(new Supplier<Action<O, O>>(){

                @Override
                public Action<O, O> get() {
                    return new TakeUntilTimeout(Stream.this.getDispatcher(), time, unit, timer);
                }
            });
        }
        return Streams.empty();
    }

    public final Stream<O> takeWhile(final Predicate<O> limitMatcher) {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new TakeWhileAction(limitMatcher);
            }
        });
    }

    public final Stream<O> skip(long max) {
        return this.skipWhile(max, null);
    }

    public final Stream<O> skip(long time, TimeUnit unit) {
        return this.skip(time, unit, this.getTimer());
    }

    public final Stream<O> skip(final long time, final TimeUnit unit, final Timer timer) {
        if (time > 0L) {
            Assert.isTrue(timer != null, "Timer can't be found, try assigning an environment to the stream");
            return this.lift(new Supplier<Action<O, O>>(){

                @Override
                public Action<O, O> get() {
                    return new SkipUntilTimeout(time, unit, timer);
                }
            });
        }
        return this;
    }

    public final Stream<O> skipWhile(Predicate<O> limitMatcher) {
        return this.skipWhile(Long.MAX_VALUE, limitMatcher);
    }

    public final Stream<O> skipWhile(final long max, final Predicate<O> limitMatcher) {
        if (max > 0L) {
            return this.lift(new Supplier<Action<O, O>>(){

                @Override
                public Action<O, O> get() {
                    return new SkipAction(limitMatcher, max);
                }
            });
        }
        return this;
    }

    public final Stream<Tuple2<Long, O>> timestamp() {
        return this.lift(new Supplier<Action<O, Tuple2<Long, O>>>(){

            @Override
            public Action<O, Tuple2<Long, O>> get() {
                return new TimestampAction();
            }
        });
    }

    public final Stream<Tuple2<Long, O>> elapsed() {
        return this.lift(new Supplier<Action<O, Tuple2<Long, O>>>(){

            @Override
            public Action<O, Tuple2<Long, O>> get() {
                return new ElapsedAction();
            }
        });
    }

    public final Stream<O> elementAt(final int index) {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new ElementAtAction(index);
            }
        });
    }

    public final Stream<O> elementAtOrDefault(final int index, final O defaultValue) {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new ElementAtAction<Object>(index, defaultValue);
            }
        });
    }

    public final Stream<O> sampleFirst() {
        return this.sampleFirst((int)Math.min(Integer.MAX_VALUE, this.getCapacity()));
    }

    public final Stream<O> sampleFirst(final int batchSize) {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new SampleAction(Stream.this.getDispatcher(), batchSize, true);
            }
        });
    }

    public final Stream<O> sampleFirst(long timespan, TimeUnit unit) {
        return this.sampleFirst(timespan, unit, this.getTimer());
    }

    public final Stream<O> sampleFirst(long timespan, TimeUnit unit, Timer timer) {
        return this.sampleFirst(Integer.MAX_VALUE, timespan, unit, timer);
    }

    public final Stream<O> sampleFirst(int maxSize, long timespan, TimeUnit unit) {
        return this.sampleFirst(maxSize, timespan, unit, this.getTimer());
    }

    public final Stream<O> sampleFirst(final int maxSize, final long timespan, final TimeUnit unit, final Timer timer) {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new SampleAction(Stream.this.getDispatcher(), true, maxSize, timespan, unit, timer);
            }
        });
    }

    public final Stream<O> sample() {
        return this.sample((int)Math.min(Integer.MAX_VALUE, this.getCapacity()));
    }

    public final Stream<O> sample(final int batchSize) {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new SampleAction(Stream.this.getDispatcher(), batchSize);
            }
        });
    }

    public final Stream<O> sample(long timespan, TimeUnit unit) {
        return this.sample(timespan, unit, this.getTimer());
    }

    public final Stream<O> sample(long timespan, TimeUnit unit, Timer timer) {
        return this.sample(Integer.MAX_VALUE, timespan, unit, timer);
    }

    public final Stream<O> sample(int maxSize, long timespan, TimeUnit unit) {
        return this.sample(maxSize, timespan, unit, this.getEnvironment() == null ? Environment.timer() : this.getEnvironment().getTimer());
    }

    public final Stream<O> sample(final int maxSize, final long timespan, final TimeUnit unit, final Timer timer) {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new SampleAction(Stream.this.getDispatcher(), false, maxSize, timespan, unit, timer);
            }
        });
    }

    public final Stream<O> distinctUntilChanged() {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new DistinctUntilChangedAction(null);
            }
        });
    }

    public final <V> Stream<O> distinctUntilChanged(final Function<? super O, ? extends V> keySelector) {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new DistinctUntilChangedAction(keySelector);
            }
        });
    }

    public final Stream<O> distinct() {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new DistinctAction(null);
            }
        });
    }

    public final <V> Stream<O> distinct(final Function<? super O, ? extends V> keySelector) {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new DistinctAction(keySelector);
            }
        });
    }

    public final Stream<Boolean> exists(final Predicate<? super O> predicate) {
        return this.lift(new Supplier<Action<O, Boolean>>(){

            @Override
            public Action<O, Boolean> get() {
                return new ExistsAction(predicate);
            }
        });
    }

    public final <V> Stream<V> split() {
        return this.split(Long.MAX_VALUE);
    }

    public final <V> Stream<V> split(final long batchSize) {
        Stream iterableStream = this;
        return iterableStream.lift(new Supplier<Action<Iterable<? extends V>, V>>(){

            @Override
            public Action<Iterable<? extends V>, V> get() {
                return new SplitAction().capacity(batchSize);
            }
        });
    }

    public final Stream<List<O>> buffer() {
        return this.buffer((int)Math.min(Integer.MAX_VALUE, this.getCapacity()));
    }

    public final Stream<List<O>> buffer(final int maxSize) {
        return this.lift(new Supplier<Action<O, List<O>>>(){

            @Override
            public Action<O, List<O>> get() {
                return new BufferAction(Stream.this.getDispatcher(), maxSize);
            }
        });
    }

    public final Stream<List<O>> buffer(final Publisher<?> bucketOpening, final Supplier<? extends Publisher<?>> boundarySupplier) {
        return this.lift(new Supplier<Action<O, List<O>>>(){

            @Override
            public Action<O, List<O>> get() {
                return new BufferShiftWhenAction(bucketOpening, boundarySupplier);
            }
        });
    }

    public final Stream<List<O>> buffer(final Supplier<? extends Publisher<?>> boundarySupplier) {
        return this.lift(new Supplier<Action<O, List<O>>>(){

            @Override
            public Action<O, List<O>> get() {
                return new BufferWhenAction(boundarySupplier);
            }
        });
    }

    public final Stream<List<O>> buffer(final int maxSize, final int skip) {
        if (maxSize == skip) {
            return this.buffer(maxSize);
        }
        return this.lift(new Supplier<Action<O, List<O>>>(){

            @Override
            public Action<O, List<O>> get() {
                return new BufferShiftAction(Stream.this.getDispatcher(), maxSize, skip);
            }
        });
    }

    public final Stream<List<O>> buffer(long timespan, TimeUnit unit) {
        return this.buffer(timespan, unit, this.getTimer());
    }

    public final Stream<List<O>> buffer(long timespan, TimeUnit unit, Timer timer) {
        return this.buffer(Integer.MAX_VALUE, timespan, unit, timer);
    }

    public final Stream<List<O>> buffer(long timespan, long timeshift, TimeUnit unit) {
        return this.buffer(timespan, timeshift, unit, this.getTimer());
    }

    public final Stream<List<O>> buffer(final long timespan, final long timeshift, final TimeUnit unit, final Timer timer) {
        if (timespan == timeshift) {
            return this.buffer(timespan, unit, timer);
        }
        return this.lift(new Supplier<Action<O, List<O>>>(){

            @Override
            public Action<O, List<O>> get() {
                return new BufferShiftAction(Stream.this.getDispatcher(), Integer.MAX_VALUE, Integer.MAX_VALUE, timeshift, timespan, unit, timer);
            }
        });
    }

    public final Stream<List<O>> buffer(int maxSize, long timespan, TimeUnit unit) {
        return this.buffer(maxSize, timespan, unit, this.getTimer());
    }

    public final Stream<List<O>> buffer(final int maxSize, final long timespan, final TimeUnit unit, final Timer timer) {
        return this.lift(new Supplier<Action<O, List<O>>>(){

            @Override
            public Action<O, List<O>> get() {
                return new BufferAction(Stream.this.getDispatcher(), maxSize, timespan, unit, timer);
            }
        });
    }

    public final Stream<O> sort() {
        return this.sort(null);
    }

    public final Stream<O> sort(int maxCapacity) {
        return this.sort(maxCapacity, null);
    }

    public final Stream<O> sort(Comparator<? super O> comparator) {
        return this.sort((int)Math.min(Integer.MAX_VALUE, this.getCapacity()), comparator);
    }

    public final Stream<O> sort(final int maxCapacity, final Comparator<? super O> comparator) {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new SortAction(Stream.this.getDispatcher(), maxCapacity, comparator);
            }
        });
    }

    public final Stream<Stream<O>> window() {
        return this.window((int)Math.min(Integer.MAX_VALUE, this.getCapacity()));
    }

    public final Stream<Stream<O>> window(final int backlog) {
        return this.lift(new Supplier<Action<O, Stream<O>>>(){

            @Override
            public Action<O, Stream<O>> get() {
                return new WindowAction(Stream.this.getEnvironment(), Stream.this.getDispatcher(), backlog);
            }
        });
    }

    public final Stream<Stream<O>> window(final int maxSize, final int skip) {
        if (maxSize == skip) {
            return this.window(maxSize);
        }
        return this.lift(new Supplier<Action<O, Stream<O>>>(){

            @Override
            public Action<O, Stream<O>> get() {
                return new WindowShiftAction(Stream.this.getEnvironment(), Stream.this.getDispatcher(), maxSize, skip);
            }
        });
    }

    public final Stream<Stream<O>> window(final Supplier<? extends Publisher<?>> boundarySupplier) {
        return this.lift(new Supplier<Action<O, Stream<O>>>(){

            @Override
            public Action<O, Stream<O>> get() {
                return new WindowWhenAction(Stream.this.getEnvironment(), Stream.this.getDispatcher(), boundarySupplier);
            }
        });
    }

    public final Stream<Stream<O>> window(final Publisher<?> bucketOpening, final Supplier<? extends Publisher<?>> boundarySupplier) {
        return this.lift(new Supplier<Action<O, Stream<O>>>(){

            @Override
            public Action<O, Stream<O>> get() {
                return new WindowShiftWhenAction(Stream.this.getEnvironment(), Stream.this.getDispatcher(), bucketOpening, boundarySupplier);
            }
        });
    }

    public final Stream<Stream<O>> window(long timespan, TimeUnit unit) {
        return this.window(timespan, unit, this.getTimer());
    }

    public final Stream<Stream<O>> window(long timespan, TimeUnit unit, Timer timer) {
        return this.window(Integer.MAX_VALUE, timespan, unit, timer);
    }

    public final Stream<Stream<O>> window(int maxSize, long timespan, TimeUnit unit) {
        return this.window(maxSize, timespan, unit, this.getEnvironment() == null ? Environment.timer() : this.getEnvironment().getTimer());
    }

    public final Stream<Stream<O>> window(final int maxSize, final long timespan, final TimeUnit unit, final Timer timer) {
        return this.lift(new Supplier<Action<O, Stream<O>>>(){

            @Override
            public Action<O, Stream<O>> get() {
                return new WindowAction(Stream.this.getEnvironment(), Stream.this.getDispatcher(), maxSize, timespan, unit, timer);
            }
        });
    }

    public final Stream<Stream<O>> window(long timespan, long timeshift, TimeUnit unit) {
        return this.window(timespan, timeshift, unit, this.getTimer());
    }

    public final Stream<Stream<O>> window(final long timespan, final long timeshift, final TimeUnit unit, final Timer timer) {
        if (timeshift == timespan) {
            return this.window(timespan, unit, timer);
        }
        return this.lift(new Supplier<Action<O, Stream<O>>>(){

            @Override
            public Action<O, Stream<O>> get() {
                return new WindowShiftAction(Stream.this.getEnvironment(), Stream.this.getDispatcher(), Integer.MAX_VALUE, Integer.MAX_VALUE, timespan, timeshift, unit, timer);
            }
        });
    }

    public final <K> Stream<GroupedStream<K, O>> groupBy(final Function<? super O, ? extends K> keyMapper) {
        return this.lift(new Supplier<Action<O, GroupedStream<K, O>>>(){

            @Override
            public Action<O, GroupedStream<K, O>> get() {
                return new GroupByAction(Stream.this.getEnvironment(), keyMapper, Stream.this.getDispatcher());
            }
        });
    }

    public final Stream<GroupedStream<Integer, O>> partition() {
        return this.partition(Environment.PROCESSORS);
    }

    public final Stream<GroupedStream<Integer, O>> partition(final int buckets) {
        return this.groupBy(new Function<O, Integer>(){

            @Override
            public Integer apply(O o) {
                int bucket = o.hashCode() % buckets;
                return bucket < 0 ? bucket + buckets : bucket;
            }
        });
    }

    public final Stream<O> reduce(@Nonnull BiFunction<O, O, O> fn) {
        return this.scan(fn).last();
    }

    public final <A> Stream<A> reduce(A initial, @Nonnull BiFunction<A, ? super O, A> fn) {
        return this.scan(initial, fn).last();
    }

    public final Stream<O> scan(@Nonnull BiFunction<O, O, O> fn) {
        return this.scan(null, fn);
    }

    public final <A> Stream<A> scan(final A initial, final @Nonnull BiFunction<A, ? super O, A> fn) {
        return this.lift(new Supplier<Action<O, A>>(){

            @Override
            public Action<O, A> get() {
                return new ScanAction(initial, fn);
            }
        });
    }

    public final Stream<Long> count() {
        return this.count(this.getCapacity());
    }

    public final Stream<Long> count(final long i) {
        return this.lift(new Supplier<Action<O, Long>>(){

            @Override
            public Action<O, Long> get() {
                return new CountAction(i);
            }
        });
    }

    public final Stream<O> throttle(long period) {
        Timer timer = this.getTimer();
        Assert.state(timer != null, "Cannot use default timer as no environment has been provided to this Stream");
        return this.throttle(period, timer);
    }

    public final Stream<O> throttle(final long period, final Timer timer) {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new ThrottleRequestAction(Stream.this.getDispatcher(), timer, period);
            }
        });
    }

    public final Stream<O> requestWhen(final Function<? super Stream<? extends Long>, ? extends Publisher<? extends Long>> throttleStream) {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new ThrottleRequestWhenAction(Stream.this.getDispatcher(), throttleStream);
            }
        });
    }

    public final Stream<O> timeout(long timeout) {
        return this.timeout(timeout, null);
    }

    public final Stream<O> timeout(long timeout, TimeUnit unit) {
        return this.timeout(timeout, unit, null);
    }

    public final Stream<O> timeout(long timeout, TimeUnit unit, Publisher<? extends O> fallback) {
        Timer timer = this.getTimer();
        Assert.state(timer != null, "Cannot use default timer as no environment has been provided to this Stream");
        return this.timeout(timeout, unit, fallback, timer);
    }

    public final Stream<O> timeout(final long timeout, final TimeUnit unit, final Publisher<? extends O> fallback, final Timer timer) {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new TimeoutAction(Stream.this.getDispatcher(), fallback, timer, unit != null ? TimeUnit.MILLISECONDS.convert(timeout, unit) : timeout);
            }
        });
    }

    public <E> CompositeAction<E, O> combine() {
        throw new IllegalStateException("Cannot combine a single Stream");
    }

    public final Promise<O> next() {
        Promise d = new Promise(this.getDispatcher(), this.getEnvironment());
        this.subscribe(d);
        return d;
    }

    public final Promise<List<O>> toList() {
        return this.toList(-1L);
    }

    public final Promise<List<O>> toList(long maximum) {
        if (maximum > 0L) {
            return this.take(maximum).buffer().next();
        }
        return this.buffer(Integer.MAX_VALUE).next();
    }

    public Stream<O> env(final Environment environment) {
        return new Stream<O>(){

            @Override
            public void subscribe(Subscriber<? super O> s) {
                Stream.this.subscribe(s);
            }

            @Override
            public long getCapacity() {
                return Stream.this.getCapacity();
            }

            @Override
            public Dispatcher getDispatcher() {
                return Stream.this.getDispatcher();
            }

            @Override
            public Environment getEnvironment() {
                return environment;
            }
        };
    }

    public final CompletableBlockingQueue<O> toBlockingQueue() {
        return this.toBlockingQueue(-1);
    }

    public final CompletableBlockingQueue<O> toBlockingQueue(int maximum) {
        CompletableBlockingQueue blockingQueue;
        Stream<O> tail = this;
        if (maximum > 0) {
            blockingQueue = new CompletableBlockingQueue(maximum);
            tail = this.take(maximum);
        } else {
            blockingQueue = new CompletableBlockingQueue(1);
        }
        Consumer<Object> terminalConsumer = new Consumer<Object>(){

            @Override
            public void accept(Object o) {
                blockingQueue.complete();
            }
        };
        this.consume(new Consumer<O>(){

            @Override
            public void accept(O o) {
                try {
                    blockingQueue.put(o);
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }, (Consumer<Throwable>)terminalConsumer, (Consumer<Void>)terminalConsumer);
        return blockingQueue;
    }

    public Stream<O> keepAlive() {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                return new Action<O, O>(){

                    @Override
                    protected void doNext(O ev) {
                        this.broadcastNext(ev);
                    }

                    @Override
                    protected void doShutdown() {
                    }
                };
            }
        });
    }

    public final <A> void subscribe(CompositeAction<O, A> subscriber) {
        this.subscribe(subscriber.input());
    }

    @Override
    public long getCapacity() {
        return Long.MAX_VALUE;
    }

    @Override
    public boolean isReactivePull(Dispatcher dispatcher, long producerCapacity) {
        return this.getCapacity() < producerCapacity && this.getDispatcher().getClass() != TailRecurseDispatcher.class && dispatcher.getClass() != TailRecurseDispatcher.class;
    }

    public Timer getTimer() {
        return this.getEnvironment() == null ? Environment.timer() : this.getEnvironment().getTimer();
    }

    public PushSubscription<O> downstreamSubscription() {
        return null;
    }

    public boolean cancelSubscription(PushSubscription<O> oPushSubscription) {
        return false;
    }

    public Environment getEnvironment() {
        return null;
    }

    public Dispatcher getDispatcher() {
        return SynchronousDispatcher.INSTANCE;
    }

    public String toString() {
        return this.getClass().getSimpleName();
    }

    private static final class StreamDispatchedSubscribe<O>
    extends Stream<O>
    implements Consumer<Subscriber<? super O>> {
        private final Dispatcher currentDispatcher;
        private final Stream<O> stream;

        public StreamDispatchedSubscribe(Stream<O> stream, Dispatcher currentDispatcher) {
            this.currentDispatcher = currentDispatcher;
            this.stream = stream;
        }

        @Override
        public void accept(Subscriber<? super O> subscriber) {
            this.stream.subscribe(new SubscribeOn<O>(this.currentDispatcher, subscriber));
        }

        @Override
        public long getCapacity() {
            return this.stream.getCapacity();
        }

        @Override
        public Environment getEnvironment() {
            return this.stream.getEnvironment();
        }

        @Override
        public Dispatcher getDispatcher() {
            return this.stream.getDispatcher();
        }

        @Override
        public void subscribe(Subscriber<? super O> subscriber) {
            this.currentDispatcher.dispatch(subscriber, this, null);
        }
    }

    private static final class SubscribeOn<O>
    implements Subscriber<O>,
    Consumer<Subscription>,
    NonBlocking {
        private final Subscriber<? super O> subscriber;
        private final Action<O, ?> action;
        private final Dispatcher dispatcher;

        public SubscribeOn(Dispatcher dispatcher, Subscriber<? super O> subscriber) {
            this.dispatcher = dispatcher;
            this.subscriber = subscriber;
            this.action = Action.class.isAssignableFrom(subscriber.getClass()) ? (Action)subscriber : null;
        }

        @Override
        public boolean isReactivePull(Dispatcher dispatcher, long producerCapacity) {
            return this.action == null || this.action.isReactivePull(dispatcher, producerCapacity);
        }

        @Override
        public long getCapacity() {
            return this.action != null ? this.action.getCapacity() : Long.MAX_VALUE;
        }

        @Override
        public void accept(Subscription subscription) {
            this.subscriber.onSubscribe(subscription);
        }

        @Override
        public void onSubscribe(Subscription s) {
            if (this.dispatcher.inContext()) {
                this.accept(s);
            } else {
                this.dispatcher.dispatch(s, this, null);
            }
        }

        @Override
        public void onNext(O o) {
            this.subscriber.onNext(o);
        }

        @Override
        public void onError(Throwable t) {
            this.subscriber.onError(t);
        }

        @Override
        public void onComplete() {
            this.subscriber.onComplete();
        }
    }
}

