/*
 * Decompiled with CFR 0.152.
 */
package ratpack.stream.internal;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import ratpack.stream.StreamMapper;
import ratpack.stream.TransformablePublisher;
import ratpack.stream.WriteStream;

public class StreamMapPublisher<U, D>
implements TransformablePublisher<D> {
    private final Publisher<? extends U> upstream;
    private final StreamMapper<? super U, ? extends D> mapper;
    private WriteStream<? super U> input;

    public StreamMapPublisher(Publisher<? extends U> upstream, StreamMapper<? super U, ? extends D> mapper) {
        this.upstream = upstream;
        this.mapper = mapper;
    }

    public void subscribe(final Subscriber<? super D> downstreamSubscriber) {
        this.upstream.subscribe(new Subscriber<U>(){

            public void onSubscribe(final Subscription upstreamSubscription) {
                try {
                    StreamMapPublisher.this.input = StreamMapPublisher.mapStream(upstreamSubscription, downstreamSubscriber, StreamMapPublisher.this.mapper);
                }
                catch (Exception e) {
                    upstreamSubscription.cancel();
                    downstreamSubscriber.onError((Throwable)e);
                    return;
                }
                downstreamSubscriber.onSubscribe(new Subscription(){

                    public void request(long n) {
                        upstreamSubscription.request(n);
                    }

                    public void cancel() {
                        upstreamSubscription.cancel();
                    }
                });
            }

            public void onNext(U i) {
                StreamMapPublisher.this.input.item(i);
            }

            public void onError(Throwable t) {
                StreamMapPublisher.this.input.error(t);
            }

            public void onComplete() {
                StreamMapPublisher.this.input.complete();
            }
        });
    }

    private static <U, D> WriteStream<U> mapStream(Subscription upstreamSubscription, final Subscriber<? super D> downstreamSubscriber, StreamMapper<U, D> mapper) throws Exception {
        return mapper.map(upstreamSubscription, new WriteStream<D>(){

            @Override
            public void item(D item) {
                downstreamSubscriber.onNext(item);
            }

            @Override
            public void error(Throwable throwable) {
                downstreamSubscriber.onError(throwable);
            }

            @Override
            public void complete() {
                downstreamSubscriber.onComplete();
            }
        });
    }
}

