/*
 * Decompiled with CFR 0.152.
 */
package xdean.jex.extra.rx2;

import io.reactivex.Flowable;
import io.reactivex.Notification;
import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.annotations.SchedulerSupport;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.LinkedBlockingQueue;
import org.reactivestreams.Subscription;
import xdean.jex.util.lang.ExceptionUtil;

public class RxIterator {
    public static <T> Function<Flowable<T>, Iterator<T>> flowableIterator() {
        return o -> RxIterator.toIterator(o);
    }

    public static <T> Iterator<T> toIterator(Flowable<T> ob) {
        return new FlowableIterator<T>(ob);
    }

    public static <T> Function<Observable<T>, Iterator<T>> observableIterator(Scheduler scheduler) {
        return o -> RxIterator.toIterator(o, scheduler);
    }

    public static <T> Function<Observable<T>, Iterator<T>> observableIterator() {
        return o -> RxIterator.toIterator(o);
    }

    public static <T> Iterator<T> toIterator(Observable<T> ob, Scheduler scheduler) {
        return new ObservableIterator<T>(ob, scheduler);
    }

    @SchedulerSupport(value="io.reactivex:io")
    public static <T> Iterator<T> toIterator(Observable<T> ob) {
        return RxIterator.toIterator(ob, Schedulers.io());
    }

    public static final class FlowableIterator<T>
    implements Iterator<T> {
        private Notification<T> next = null;
        private LinkedBlockingQueue<Notification<T>> queue = new LinkedBlockingQueue(1);
        private boolean completed = false;
        private Subscription subscription;

        public FlowableIterator(Flowable<T> source) {
            source.materialize().subscribe(e -> this.queue.put((Notification<T>)e), e -> {
                this.completed = true;
            }, () -> {
                this.completed = true;
            }, s -> {
                this.subscription = s;
            });
        }

        @Override
        public boolean hasNext() {
            this.calcNext();
            return !this.completed;
        }

        @Override
        public T next() {
            this.calcNext();
            if (this.next == null) {
                throw new NoSuchElementException();
            }
            Object t = this.next.getValue();
            this.next = null;
            return (T)t;
        }

        private void calcNext() {
            if (this.completed) {
                return;
            }
            if (this.next == null) {
                this.subscription.request(1L);
                Notification take = ExceptionUtil.uncheck(() -> this.queue.take());
                if (take.isOnNext()) {
                    this.next = take;
                } else {
                    if (take.isOnError()) {
                        this.completed = true;
                        throw new RuntimeException(take.getError());
                    }
                    this.completed = true;
                }
            }
        }
    }

    public static final class ObservableIterator<T>
    implements Iterator<T> {
        private LinkedBlockingQueue<Notification<T>> queue = new LinkedBlockingQueue();
        private Notification<T> next = null;
        private boolean completed = false;

        public ObservableIterator(Observable<T> source, Scheduler scheduler) {
            source.materialize().subscribeOn(scheduler).subscribe(this.queue::put);
        }

        @Override
        public boolean hasNext() {
            this.calcNext();
            return !this.completed;
        }

        @Override
        public T next() {
            this.calcNext();
            if (this.next == null) {
                throw new NoSuchElementException();
            }
            Object t = this.next.getValue();
            this.next = null;
            return (T)t;
        }

        private void calcNext() {
            if (this.completed) {
                return;
            }
            if (this.next == null) {
                Notification take = ExceptionUtil.uncheck(this.queue::take);
                if (take.isOnNext()) {
                    this.next = take;
                } else {
                    if (take.isOnError()) {
                        this.completed = true;
                        throw new RuntimeException(take.getError());
                    }
                    this.completed = true;
                }
            }
        }
    }
}

