/*
 * Decompiled with CFR 0.152.
 */
package com.aol.cyclops.types.futurestream;

import com.aol.cyclops.control.LazyReact;
import com.aol.cyclops.control.SimpleReact;
import com.aol.cyclops.data.async.Queue;
import com.aol.cyclops.data.async.QueueFactory;
import com.aol.cyclops.data.collections.extensions.standard.ListX;
import com.aol.cyclops.internal.react.exceptions.FilteredExecutionPathException;
import com.aol.cyclops.internal.react.stream.EagerStreamWrapper;
import com.aol.cyclops.react.SimpleReactFailedStageException;
import com.aol.cyclops.react.StageWithResults;
import com.aol.cyclops.react.Status;
import com.aol.cyclops.react.async.subscription.Continueable;
import com.aol.cyclops.react.collectors.lazy.Blocker;
import com.aol.cyclops.types.futurestream.BaseSimpleReactStream;
import com.aol.cyclops.types.futurestream.BlockingStream;
import com.aol.cyclops.types.futurestream.BlockingStreamHelper;
import com.aol.cyclops.types.futurestream.ConfigurableStream;
import com.aol.cyclops.types.futurestream.EagerFutureStreamFunctions;
import com.aol.cyclops.types.futurestream.LazyFutureStream;
import com.aol.cyclops.types.futurestream.ToQueue;
import com.aol.cyclops.util.ThrowsSoftened;
import com.aol.cyclops.util.stream.StreamUtils;
import com.nurkiewicz.asyncretry.RetryExecutor;
import com.nurkiewicz.asyncretry.policy.AbortRetryException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.ListIterator;
import java.util.Optional;
import java.util.Spliterators;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.jooq.lambda.Seq;
import org.jooq.lambda.tuple.Tuple;
import org.jooq.lambda.tuple.Tuple2;

public interface SimpleReactStream<U>
extends BaseSimpleReactStream<U>,
BlockingStream<U>,
ConfigurableStream<U, CompletableFuture<U>>,
ToQueue<U> {
    @Override
    public SimpleReact getSimpleReact();

    public SimpleReactStream<U> withLastActive(EagerStreamWrapper var1);

    @Override
    public EagerStreamWrapper getLastActive();

    public SimpleReactStream<U> withTaskExecutor(Executor var1);

    @Override
    public SimpleReactStream<U> withRetrier(RetryExecutor var1);

    @Override
    public SimpleReactStream<U> withQueueFactory(QueueFactory<U> var1);

    public SimpleReactStream<U> withErrorHandler(Optional<Consumer<Throwable>> var1);

    @Override
    public SimpleReactStream<U> withSubscription(Continueable var1);

    public SimpleReactStream<U> withAsync(boolean var1);

    @Override
    public Continueable getSubscription();

    default public <T2> Seq<Tuple2<U, T2>> combineLatest(SimpleReactStream<T2> right) {
        return EagerFutureStreamFunctions.combineLatest(this, right);
    }

    default public <T2> Seq<Tuple2<U, T2>> withLatest(SimpleReactStream<T2> right) {
        return EagerFutureStreamFunctions.withLatest(this, right);
    }

    default public SimpleReactStream<U> self(Consumer<SimpleReactStream<U>> consumer) {
        return this.peek((T n) -> consumer.accept(this));
    }

    default public void run() {
        this.getLastActive().collect();
    }

    default public Tuple2<SimpleReactStream<U>, SimpleReactStream<U>> splitAt(long position) {
        Stream<CompletableFuture> stream = this.getLastActive().stream();
        Tuple2 split = Seq.seq(stream).splitAt(position);
        return new Tuple2(this.fromListCompletableFuture((List)((Seq)split.v1).collect(Collectors.toList())), this.fromListCompletableFuture((List)((Seq)split.v2).collect(Collectors.toList())));
    }

    default public Tuple2<SimpleReactStream<U>, SimpleReactStream<U>> duplicate() {
        Stream<CompletableFuture> stream = this.getLastActive().stream();
        Tuple2 duplicated = Seq.seq(stream).duplicate();
        Tuple2 dup = new Tuple2(this.fromStreamOfFutures((Stream)duplicated.v1), this.fromStreamOfFutures((Stream)duplicated.v2));
        return dup;
    }

    default public <R> SimpleReactStream<Tuple2<U, R>> zip(Stream<R> other) {
        Seq seq;
        Seq withType = seq = Seq.seq(this.getLastActive().stream()).zip(Seq.seq(other));
        SimpleReactStream<Tuple2<U, R>> futureStream = this.fromStreamOfFutures((Stream<CompletableFuture<R>>)withType.map(t -> ((CompletableFuture)t.v1).thenApply(v -> Tuple.tuple(((CompletableFuture)t.v1).join(), (Object)t.v2))));
        return futureStream;
    }

    default public <R> SimpleReactStream<Tuple2<U, R>> zip(SimpleReactStream<R> other) {
        Seq seq;
        Seq withType = seq = Seq.seq(this.getLastActive().stream()).zip(Seq.seq(other.getLastActive().stream()));
        SimpleReactStream<Tuple2<U, R>> futureStream = this.fromStreamOfFutures((Stream<CompletableFuture<R>>)withType.map(t -> CompletableFuture.allOf((CompletableFuture)t.v1, (CompletableFuture)t.v2).thenApply(v -> Tuple.tuple(((CompletableFuture)t.v1).join(), ((CompletableFuture)t.v2).join()))));
        return futureStream;
    }

    default public SimpleReactStream<Tuple2<U, Long>> zipWithIndex() {
        Seq seq;
        Seq withType = seq = Seq.seq(this.getLastActive().stream().iterator()).zipWithIndex();
        SimpleReactStream<Tuple2<U, Long>> futureStream = this.fromStreamOfFutures((Stream)withType.map(t -> ((CompletableFuture)t.v1).thenApply(v -> Tuple.tuple(((CompletableFuture)t.v1).join(), (Object)t.v2))));
        return futureStream;
    }

    @SafeVarargs
    public static <U> SimpleReactStream<U> firstOf(SimpleReactStream<U> ... futureStreams) {
        return EagerFutureStreamFunctions.firstOf(futureStreams);
    }

    default public SimpleReactStream<U> reverse() {
        EagerStreamWrapper lastActive = this.getLastActive();
        ListIterator<CompletableFuture> it = lastActive.list().listIterator();
        ArrayList<CompletableFuture> result = new ArrayList<CompletableFuture>();
        while (it.hasPrevious()) {
            result.add(it.previous());
        }
        EagerStreamWrapper limited = lastActive.withList(result);
        return this.withLastActive(limited);
    }

    default public SimpleReactStream<U> slice(long from, long to) {
        List noType = (List)Seq.seq(this.getLastActive().stream()).slice(from, to).collect(Collectors.toList());
        return this.fromListCompletableFuture(noType);
    }

    default public SimpleReactStream<U> limit(long maxSize) {
        EagerStreamWrapper lastActive = this.getLastActive();
        EagerStreamWrapper limited = lastActive.withList(lastActive.stream().limit(maxSize).collect(Collectors.toList()));
        return this.withLastActive(limited);
    }

    @Override
    default public SimpleReactStream<U> skip(long n) {
        EagerStreamWrapper lastActive = this.getLastActive();
        EagerStreamWrapper limited = lastActive.withList(lastActive.stream().skip(n).collect(Collectors.toList()));
        return this.withLastActive(limited);
    }

    default public <T> Seq<U> skipUntil(SimpleReactStream<T> s) {
        return EagerFutureStreamFunctions.skipUntil(this, s);
    }

    default public <T> Seq<U> takeUntil(SimpleReactStream<T> s) {
        return EagerFutureStreamFunctions.takeUntil(this, s);
    }

    default public void cancel() {
        this.streamCompletableFutures().forEach(next -> next.cancel(true));
    }

    default public ListX<SimpleReactStream<U>> copySimpleReactStream(int times) {
        return StreamUtils.toBufferingCopier(this.getLastActive().stream().iterator(), times).stream().map(it -> StreamSupport.stream(Spliterators.spliteratorUnknownSize(it, 16), false)).map(fs -> this.getSimpleReact().construct((Stream)fs)).toListX();
    }

    @Override
    default public <R> SimpleReactStream<R> then(Function<? super U, ? extends R> fn, Executor service) {
        return this.withLastActive(this.getLastActive().stream(s -> s.map(ft -> ft.thenApplyAsync(SimpleReactStream.handleExceptions(fn), this.getTaskExecutor()))));
    }

    @Override
    default public <R> SimpleReactStream<R> thenSync(Function<? super U, ? extends R> fn) {
        return this.withLastActive(this.getLastActive().stream(s -> s.map(ft -> ft.thenApply(SimpleReactStream.handleExceptions(fn)))));
    }

    default public <R1, R2> SimpleReactStream<R2> allOf(Collector<? super U, ?, R1> collector, Function<? super R1, ? extends R2> fn) {
        CompletableFuture[] array = SimpleReactStream.lastActiveArray(this.getLastActive());
        CompletableFuture<Void> cf = CompletableFuture.allOf(array);
        Function<Exception, Object> f = e -> {
            BlockingStreamHelper.capture(e, this.getErrorHandler());
            return BlockingStreamHelper.block((BlockingStream)this, Collectors.toList(), new EagerStreamWrapper(Stream.of(array), this.getErrorHandler()));
        };
        CompletionStage onFail = cf.exceptionally(f);
        CompletionStage onSuccess = ((CompletableFuture)onFail).thenApplyAsync(result -> new StageWithResults(this.getTaskExecutor(), null, result).submit(() -> fn.apply((Object)BlockingStreamHelper.aggregateResultsCompletable(collector, Stream.of(array).collect(Collectors.toList()), this.getErrorHandler()))), this.getTaskExecutor());
        return this.withLastActive(new EagerStreamWrapper((CompletableFuture)onSuccess, this.getErrorHandler()));
    }

    default public <R> SimpleReactStream<R> anyOf(Function<? super U, ? extends R> fn) {
        CompletableFuture[] array = SimpleReactStream.lastActiveArray(this.getLastActive());
        CompletableFuture<Object> cf = CompletableFuture.anyOf(array);
        CompletionStage onSuccess = cf.thenApplyAsync(fn, this.getTaskExecutor());
        return this.withLastActive(new EagerStreamWrapper((CompletableFuture)onSuccess, this.getErrorHandler()));
    }

    public static CompletableFuture[] lastActiveArray(EagerStreamWrapper lastActive) {
        return lastActive.list().toArray(new CompletableFuture[0]);
    }

    @Override
    default public <R> SimpleReactStream<R> retry(Function<? super U, ? extends R> fn) {
        Function<Stream<CompletableFuture>, Stream<CompletableFuture>> mapper = stream -> stream.map(ft -> ft.thenApplyAsync(res -> this.getRetrier().getWithRetry(() -> SimpleReactStream.handleExceptions(fn).apply(res)).join(), this.getTaskExecutor()));
        return this.withLastActive(this.getLastActive().stream(mapper));
    }

    @Override
    default public <R> SimpleReactStream<R> fromStream(Stream<R> stream) {
        return this.withLastActive(this.getLastActive().withNewStream(stream.map(CompletableFuture::completedFuture), this.getSimpleReact()));
    }

    default public <R> SimpleReactStream<R> fromStreamOfFutures(Stream<CompletableFuture<R>> stream) {
        Stream<CompletableFuture> noType = stream;
        return this.withLastActive(this.getLastActive().withNewStream(noType, this.getSimpleReact()));
    }

    default public <R> SimpleReactStream<R> fromStreamCompletableFutureReplace(Stream<CompletableFuture<R>> stream) {
        Stream<CompletableFuture> noType = stream;
        return this.withLastActive(this.getLastActive().withStream(noType));
    }

    default public <R> SimpleReactStream<R> fromListCompletableFuture(List<CompletableFuture<R>> list) {
        List<CompletableFuture> noType = list;
        return this.withLastActive(this.getLastActive().withList(noType));
    }

    @Override
    default public <R> SimpleReactStream<R> then(Function<? super U, ? extends R> fn) {
        if (!this.isAsync()) {
            return this.thenSync((Function)fn);
        }
        Function<Stream<CompletableFuture>, Stream<CompletableFuture>> streamMapper = s -> s.map(ft -> ft.thenApplyAsync(SimpleReactStream.handleExceptions(fn), this.getTaskExecutor()));
        return this.withLastActive(this.getLastActive().stream(streamMapper));
    }

    @Override
    default public SimpleReactStream<U> peek(Consumer<? super U> consumer) {
        if (!this.isAsync()) {
            return this.peekSync((Consumer)consumer);
        }
        return this.then((T t) -> {
            consumer.accept((Object)t);
            return t;
        });
    }

    @Override
    default public SimpleReactStream<U> peekSync(Consumer<? super U> consumer) {
        return this.thenSync((T t) -> {
            consumer.accept((Object)t);
            return t;
        });
    }

    public static <U, R> Function<U, R> handleExceptions(Function<? super U, ? extends R> fn) {
        return input -> {
            try {
                return fn.apply((Object)input);
            }
            catch (Throwable t) {
                if (t instanceof AbortRetryException) {
                    throw t;
                }
                throw new SimpleReactFailedStageException(input, t);
            }
        };
    }

    @Override
    default public <R> SimpleReactStream<R> flatMapToCompletableFuture(Function<? super U, CompletableFuture<? extends R>> flatFn) {
        if (!this.isAsync()) {
            return this.flatMapToCompletableFutureSync((Function)flatFn);
        }
        Function<Stream<CompletableFuture>, Stream<CompletableFuture>> streamMapper = s -> s.map(ft -> ft.thenComposeAsync(SimpleReactStream.handleExceptions(flatFn), this.getTaskExecutor()));
        return this.withLastActive(this.getLastActive().stream(streamMapper));
    }

    @Override
    default public <R> SimpleReactStream<R> flatMapToCompletableFutureSync(Function<? super U, CompletableFuture<? extends R>> flatFn) {
        Function<Stream<CompletableFuture>, Stream<CompletableFuture>> streamMapper = s -> s.map(ft -> ft.thenCompose(SimpleReactStream.handleExceptions(flatFn)));
        return this.withLastActive(this.getLastActive().stream(streamMapper));
    }

    @Override
    default public <R> SimpleReactStream<R> flatMap(Function<? super U, ? extends Stream<? extends R>> flatFn) {
        return this.getSimpleReact().construct(Stream.of(new Object[0])).withSubscription(this.getSubscription()).withQueueFactory(this.getQueueFactory()).fromStream(this.toQueue().stream(this.getSubscription()).flatMap(flatFn));
    }

    default public <R> List<CompletableFuture<R>> with(Function<? super U, ? extends R> fn) {
        return this.getLastActive().stream().map(future -> future.thenApplyAsync(fn, this.getTaskExecutor())).collect(Collectors.toList());
    }

    @Override
    default public SimpleReactStream<U> filter(Predicate<? super U> p) {
        if (!this.isAsync()) {
            return this.filterSync((Predicate)p);
        }
        Function<Stream<CompletableFuture>, Stream<CompletableFuture>> fn = s -> s.map(ft -> ft.thenApplyAsync(in -> {
            if (!p.test((Object)in)) {
                throw new FilteredExecutionPathException();
            }
            return in;
        }));
        return this.withLastActive(this.getLastActive().stream(fn));
    }

    @Override
    default public SimpleReactStream<U> filterSync(Predicate<? super U> p) {
        Function<Stream<CompletableFuture>, Stream<CompletableFuture>> fn = s -> s.map(ft -> ft.thenApply(in -> {
            if (!p.test((Object)in)) {
                throw new FilteredExecutionPathException();
            }
            return in;
        }));
        return this.withLastActive(this.getLastActive().stream(fn));
    }

    @Override
    default public <T> Stream<CompletableFuture<T>> streamCompletableFutures() {
        Stream<CompletableFuture<T>> s = this.getLastActive().stream();
        return s;
    }

    default public SimpleReactStream<U> merge(SimpleReactStream<U> ... s) {
        List<CompletableFuture> merged = Stream.concat(Stream.of(this), Stream.of(s)).map(stream -> stream.getLastActive().list()).flatMap(Collection::stream).collect(Collectors.toList());
        return this.withLastActive(new EagerStreamWrapper(merged, this.getErrorHandler()));
    }

    @ThrowsSoftened(value={InterruptedException.class, ExecutionException.class})
    default public ListX<U> block(Predicate<Status> breakout) {
        return new Blocker(this.getLastActive().list(), this.getErrorHandler()).block(breakout);
    }

    @ThrowsSoftened(value={InterruptedException.class, ExecutionException.class})
    default public <R> R block(Collector collector, Predicate<Status> breakout) {
        return this.block(breakout).stream().collect(collector);
    }

    public static <R> SimpleReactStream<R> merge(SimpleReactStream s1, SimpleReactStream s2) {
        List<CompletableFuture> merged = Stream.of(s1.getLastActive().list(), s2.getLastActive().list()).flatMap(Collection::stream).collect(Collectors.toList());
        return s1.withLastActive(new EagerStreamWrapper(merged, s1.getErrorHandler()));
    }

    @Override
    default public SimpleReactStream<U> onFail(Function<? super SimpleReactFailedStageException, ? extends U> fn) {
        return this.onFail(Throwable.class, (Function)fn);
    }

    @Override
    default public SimpleReactStream<U> onFail(Class<? extends Throwable> exceptionClass, Function<? super SimpleReactFailedStageException, ? extends U> fn) {
        Function<Stream<CompletableFuture>, Stream<CompletableFuture>> mapper = s -> s.map(ft -> ft.exceptionally(t -> {
            SimpleReactFailedStageException simpleReactException;
            if (t instanceof FilteredExecutionPathException) {
                throw (FilteredExecutionPathException)t;
            }
            Throwable throwable = (Throwable)t;
            if (t instanceof CompletionException) {
                throwable = ((Exception)t).getCause();
            }
            if (exceptionClass.isAssignableFrom((simpleReactException = SimpleReactStream.assureSimpleReactException(throwable)).getCause().getClass())) {
                return fn.apply(simpleReactException);
            }
            throw simpleReactException;
        }));
        return this.withLastActive(this.getLastActive().stream(mapper));
    }

    public static SimpleReactFailedStageException assureSimpleReactException(Throwable throwable) {
        if (throwable instanceof SimpleReactFailedStageException) {
            return (SimpleReactFailedStageException)throwable;
        }
        return new SimpleReactFailedStageException((Object)null, throwable);
    }

    @Override
    default public SimpleReactStream<U> capture(Consumer<Throwable> errorHandler) {
        return this.withLastActive(this.getLastActive().withErrorHandler(Optional.of(errorHandler))).withErrorHandler((Optional)Optional.of(errorHandler));
    }

    default public <R> SimpleReactStream<R> allOf(Function<? super List<U>, ? extends R> fn) {
        return this.allOf(Collectors.toList(), fn);
    }

    default public LazyFutureStream<U> convertToLazyStream() {
        return new LazyReact(this.getTaskExecutor()).withRetrier(this.getRetrier()).fromStreamFutures(this.getLastActive().stream());
    }

    @Override
    default public SimpleReactStream<U> sync() {
        return this.withAsync(false);
    }

    @Override
    default public SimpleReactStream<U> async() {
        return this.withAsync(true);
    }

    public static <U, R> SimpleReactStream<R> bind(SimpleReactStream<U> stream, Function<U, BaseSimpleReactStream<R>> flatFn) {
        return SimpleReactStream.join(stream.then(flatFn));
    }

    public static <U, R> SimpleReactStream<R> join(SimpleReactStream<BaseSimpleReactStream<U>> stream) {
        Queue queue = stream.getQueueFactory().build();
        stream.then((T it) -> it.sync().then(queue::offer)).allOf(it -> queue.close());
        return stream.fromStream((Stream)queue.stream(stream.getSubscription()));
    }
}

