/*
 * Decompiled with CFR 0.152.
 */
package com.aol.cyclops.streams;

import com.aol.cyclops.streams.StreamUtils;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.jooq.lambda.tuple.Tuple;
import org.jooq.lambda.tuple.Tuple3;
import org.reactivestreams.Subscription;

public class FutureStreamUtils {
    public static <T, X extends Throwable> Tuple3<CompletableFuture<Subscription>, Runnable, CompletableFuture<Boolean>> forEachX(Stream<T> stream, long x, Consumer<? super T> consumerElement) {
        return FutureStreamUtils.forEachXEvents(stream, x, consumerElement, e -> {}, () -> {});
    }

    public static <T, X extends Throwable> Tuple3<CompletableFuture<Subscription>, Runnable, CompletableFuture<Boolean>> forEachXWithError(Stream<T> stream, long x, Consumer<? super T> consumerElement, Consumer<? super Throwable> consumerError) {
        return FutureStreamUtils.forEachXEvents(stream, x, consumerElement, consumerError, () -> {});
    }

    public static <T, X extends Throwable> Tuple3<CompletableFuture<Subscription>, Runnable, CompletableFuture<Boolean>> forEachXEvents(final Stream<T> stream, long x, final Consumer<? super T> consumerElement, final Consumer<? super Throwable> consumerError, final Runnable onComplete) {
        final CompletableFuture streamCompleted = new CompletableFuture();
        Subscription s = new Subscription(){
            Iterator<T> it;
            volatile boolean running;
            {
                this.it = stream.iterator();
                this.running = true;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void request(long n) {
                int i = 0;
                while ((long)i < n && this.running) {
                    block7: {
                        try {
                            if (this.it.hasNext()) {
                                consumerElement.accept(this.it.next());
                                break block7;
                            }
                            try {
                                onComplete.run();
                            }
                            finally {
                                streamCompleted.complete(true);
                                break;
                            }
                        }
                        catch (Throwable t) {
                            consumerError.accept(t);
                        }
                    }
                    ++i;
                }
            }

            public void cancel() {
                this.running = false;
            }
        };
        CompletableFuture<1> subscription = CompletableFuture.completedFuture(s);
        return Tuple.tuple(subscription, () -> s.request(x), streamCompleted);
    }

    public static <T, X extends Throwable> Tuple3<CompletableFuture<Subscription>, Runnable, CompletableFuture<Boolean>> forEachWithError(Stream<T> stream, Consumer<? super T> consumerElement, Consumer<? super Throwable> consumerError) {
        return FutureStreamUtils.forEachEvent(stream, consumerElement, consumerError, () -> {});
    }

    public static <T, X extends Throwable> Tuple3<CompletableFuture<Subscription>, Runnable, CompletableFuture<Boolean>> forEachEvent(Stream<T> stream, Consumer<? super T> consumerElement, final Consumer<? super Throwable> consumerError, final Runnable onComplete) {
        CompletableFuture subscription = new CompletableFuture();
        final CompletableFuture streamCompleted = new CompletableFuture();
        return Tuple.tuple(subscription, () -> {
            final Iterator it = stream.iterator();
            final Object UNSET = new Object();
            StreamUtils.stream(new Iterator<T>(){
                boolean errored = true;

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public boolean hasNext() {
                    boolean result = false;
                    try {
                        boolean bl = result = it.hasNext();
                        return bl;
                    }
                    catch (Throwable t) {
                        consumerError.accept(t);
                        this.errored = true;
                        boolean bl = true;
                        return bl;
                    }
                    finally {
                        if (!result) {
                            try {
                                onComplete.run();
                            }
                            finally {
                                streamCompleted.complete(true);
                            }
                        }
                    }
                }

                @Override
                public T next() {
                    try {
                        if (this.errored) {
                            Object object = UNSET;
                            return object;
                        }
                        Object e = it.next();
                        return e;
                    }
                    finally {
                        this.errored = false;
                    }
                }
            }).filter(t -> t != UNSET).forEach(consumerElement);
        }, streamCompleted);
    }
}

