/*
 * Decompiled with CFR 0.152.
 */
package com.aol.cyclops.javaslang.reactivestreams;

import com.aol.cyclops.javaslang.reactivestreams.ReactiveStream;
import com.aol.simple.react.async.Queue;
import com.aol.simple.react.async.subscription.Continueable;
import com.aol.simple.react.async.subscription.Subscription;
import com.aol.simple.react.stream.traits.Continuation;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.reactivestreams.Subscriber;

public class JavaslangReactiveStreamsSubscriber<T>
implements Subscriber<T> {
    protected volatile Queue<T> queue;
    volatile org.reactivestreams.Subscription subscription;
    private volatile Stream<T> jdkStream;
    volatile Consumer errorHandler = e -> {};

    protected Stream<T> stream() {
        Subscription subscription = new Subscription();
        return this.queue.stream((Continueable)subscription);
    }

    public ReactiveStream<T> getStream() {
        return ReactiveStream.fromIterator(this.jdkStream.iterator());
    }

    public void onSubscribe(final org.reactivestreams.Subscription s) {
        Objects.requireNonNull(s);
        if (this.subscription != null) {
            s.cancel();
            return;
        }
        this.queue = new Queue(){

            public T get() {
                s.request(1L);
                return super.get();
            }
        };
        this.subscription = s;
        this.jdkStream = this.stream();
        s.request(1L);
    }

    public void onNext(T t) {
        Objects.requireNonNull(t);
        this.queue.add(t);
    }

    public void onError(Throwable t) {
        Objects.requireNonNull(t);
        this.errorHandler.accept(t);
    }

    public void onComplete() {
        if (this.queue != null) {
            this.queue.addContinuation(new Continuation(() -> {
                throw new Queue.ClosedQueueException();
            }));
            this.queue.close();
        }
    }

    public org.reactivestreams.Subscription getSubscription() {
        return this.subscription;
    }
}

