/*
 * Decompiled with CFR 0.152.
 */
package com.aol.cyclops.types.stream.reactive;

import com.aol.cyclops.data.async.Queue;
import com.aol.cyclops.internal.react.exceptions.SimpleReactProcessingException;
import com.aol.cyclops.internal.react.stream.LazyStreamWrapper;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public interface FutureStreamSynchronousPublisher<T>
extends Publisher<T> {
    public LazyStreamWrapper getLastActive();

    public void cancel();

    public void forwardErrors(Consumer<Throwable> var1);

    default public void subscribeSync(Subscriber<? super T> s) {
        this.subscribe(s);
    }

    default public void subscribe(final Subscriber<? super T> s) {
        try {
            this.forwardErrors(t -> s.onError(t));
            final Queue<T> queue = this.toQueue();
            final Iterator it = queue.streamCompletableFutures().iterator();
            Subscription sub = new Subscription(){
                volatile boolean complete = false;
                volatile boolean cancelled = false;
                final LinkedList<Long> requests = new LinkedList();
                boolean active = false;

                private void handleNext(T data) {
                    if (!this.cancelled) {
                        s.onNext(data);
                    }
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void request(long n) {
                    if (n < 1L) {
                        s.onError((Throwable)new IllegalArgumentException("3.9 While the Subscription is not cancelled, Subscription.request(long n) MUST throw a java.lang.IllegalArgumentException if the argument is <= 0."));
                    }
                    this.requests.add(n);
                    ArrayList<CompletableFuture> results = new ArrayList<CompletableFuture>();
                    if (this.active) {
                        return;
                    }
                    this.active = true;
                    try {
                        while (!this.cancelled && this.requests.size() > 0) {
                            long n2 = this.requests.peek();
                            int i = 0;
                            while ((long)i < n2) {
                                block10: {
                                    try {
                                        if (it.hasNext()) {
                                            this.handleNext(s, it, results);
                                            break block10;
                                        }
                                        this.handleComplete(results, s);
                                        break;
                                    }
                                    catch (Throwable t) {
                                        s.onError(t);
                                    }
                                }
                                ++i;
                            }
                            this.requests.pop();
                        }
                    }
                    finally {
                        this.active = false;
                    }
                }

                private void handleComplete(List<CompletableFuture> results, Subscriber<? super T> s2) {
                    if (!this.complete && !this.cancelled) {
                        this.complete = true;
                        if (results.size() > 0) {
                            ((CompletableFuture)CompletableFuture.allOf(results.stream().map(cf -> cf.exceptionally(e -> null)).collect(Collectors.toList()).toArray(new CompletableFuture[results.size()])).thenAccept(a -> this.callOnComplete(s2))).exceptionally(e -> {
                                this.callOnComplete(s2);
                                return null;
                            });
                        } else {
                            this.callOnComplete(s2);
                        }
                    }
                }

                private void callOnComplete(Subscriber<? super T> s2) {
                    s2.onComplete();
                }

                private void handleNext(Subscriber<? super T> s2, Iterator<CompletableFuture<T>> it2, List<CompletableFuture> results) {
                    results.add((CompletableFuture)((CompletableFuture)it2.next().thenAccept(r -> s2.onNext(r))).exceptionally(t -> {
                        s2.onError(t);
                        return null;
                    }));
                    List newResults = results.stream().filter(cf -> cf.isDone()).collect(Collectors.toList());
                    results.removeAll(newResults);
                }

                public void cancel() {
                    this.cancelled = true;
                    FutureStreamSynchronousPublisher.this.forwardErrors(t -> {});
                    queue.closeAndClear();
                }
            };
            s.onSubscribe(sub);
        }
        catch (SimpleReactProcessingException simpleReactProcessingException) {
            // empty catch block
        }
    }

    public Queue<T> toQueue();
}

