/*
 * Decompiled with CFR 0.152.
 */
package com.ibm.asyncutil.flow;

import com.ibm.asyncutil.iteration.AsyncIterator;
import com.ibm.asyncutil.iteration.AsyncTrampoline;
import com.ibm.asyncutil.locks.AsyncLock;
import com.ibm.asyncutil.util.Either;
import com.ibm.asyncutil.util.StageSupport;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

public class FlowAdapter {
    private FlowAdapter() {
    }

    public static <T> AsyncIterator<T> toAsyncIterator(Flow.Publisher<? extends T> publisher) {
        return new SubscribingIterator<T>(publisher);
    }

    public static <T> Flow.Publisher<T> toPublisher(AsyncIterator<? extends T> asyncIterator) {
        return new IteratorBackedPublisher(asyncIterator);
    }

    public static <T> Flow.Publisher<T> toPublisher(Supplier<AsyncIterator<? extends T>> asyncIteratorSupplier) {
        return new SuppliedIteratorBackedPublisher(asyncIteratorSupplier);
    }

    private static class IteratorBackedSubscription<T>
    implements Flow.Subscription {
        final AsyncIterator<T> iterator;
        final Flow.Subscriber<? super T> subscriber;
        final AsyncLock lock = AsyncLock.create();
        final AtomicBoolean finished = new AtomicBoolean();

        IteratorBackedSubscription(AsyncIterator<T> iterator, Flow.Subscriber<? super T> subscriber) {
            this.iterator = Objects.requireNonNull(iterator);
            this.subscriber = Objects.requireNonNull(subscriber);
        }

        @Override
        public void request(long n) {
            StageSupport.tryComposeWith((CompletionStage)this.lock.acquireLock(), token -> this.getn(n));
        }

        private CompletionStage<Void> getn(final long n) {
            CompletionStage fillStage = n <= 0L ? StageSupport.exceptionalStage((Throwable)new IllegalArgumentException("subscription requests must be positive")) : AsyncTrampoline.asyncWhile((Supplier)new Supplier<CompletionStage<Boolean>>(){
                long demand;
                {
                    this.demand = n;
                }

                @Override
                public CompletionStage<Boolean> get() {
                    if (this.demand-- == 0L || finished.get()) {
                        return StageSupport.completedStage((Object)false);
                    }
                    return iterator.nextStage().thenCompose(e -> (CompletionStage)e.fold(this::onEnd, this::onNext));
                }

                private CompletionStage<Boolean> onEnd(AsyncIterator.End end) {
                    if (this.finish()) {
                        return iterator.close().handle((ig, closeEx) -> {
                            this.notifySubscriber(false, null, closeEx);
                            return false;
                        });
                    }
                    return StageSupport.completedStage((Object)false);
                }

                private CompletionStage<Boolean> onNext(T next) {
                    subscriber.onNext(next);
                    return StageSupport.completedStage((Object)true);
                }
            });
            return fillStage.exceptionally(e -> {
                if (this.finish()) {
                    this.iterator.close().whenComplete((ig, closeEx) -> this.notifySubscriber(false, (Throwable)e, (Throwable)closeEx));
                }
                return null;
            });
        }

        private boolean finish() {
            return !this.finished.getAndSet(true);
        }

        private void notifySubscriber(boolean cancelled, Throwable exception, Throwable closeException) {
            exception = this.unwrapCompletionException(exception);
            closeException = this.unwrapCompletionException(closeException);
            if (exception == null && closeException == null) {
                if (!cancelled) {
                    this.subscriber.onComplete();
                }
            } else if (exception != null && closeException != null) {
                exception.addSuppressed(closeException);
                this.subscriber.onError(exception);
            } else if (exception != null) {
                this.subscriber.onError(exception);
            } else {
                this.subscriber.onError(closeException);
            }
        }

        private Throwable unwrapCompletionException(Throwable throwable) {
            if (throwable instanceof CompletionException) {
                return throwable.getCause();
            }
            return throwable;
        }

        @Override
        public void cancel() {
            if (this.finish()) {
                StageSupport.tryComposeWith((CompletionStage)this.lock.acquireLock(), token -> this.iterator.close().whenComplete((ig, closeEx) -> this.notifySubscriber(true, null, (Throwable)closeEx)));
            }
        }
    }

    static class SubscribingIterator<T>
    implements Flow.Subscriber<T>,
    AsyncIterator<T> {
        final Flow.Publisher<? extends T> publisher;
        Flow.Subscription subscription;
        CompletableFuture<Either<AsyncIterator.End, T>> next = new CompletableFuture();

        SubscribingIterator() {
            this.publisher = null;
        }

        SubscribingIterator(Flow.Publisher<? extends T> publisher) {
            this.publisher = Objects.requireNonNull(publisher);
        }

        public CompletionStage<Either<AsyncIterator.End, T>> nextStage() {
            if (this.subscription == null) {
                Objects.requireNonNull(this.publisher, "subscriber with no publisher must manually be subscribed before consumption").subscribe(this);
            }
            CompletableFuture<Either<AsyncIterator.End, T>> curr = this.next;
            this.subscription.request(1L);
            return curr;
        }

        public CompletionStage<Void> close() {
            if (this.subscription != null) {
                this.subscription.cancel();
            }
            return StageSupport.voidStage();
        }

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            Objects.requireNonNull(subscription);
            if (this.subscription != null) {
                subscription.cancel();
                return;
            }
            this.subscription = subscription;
        }

        @Override
        public void onNext(T item) {
            Objects.requireNonNull(item);
            CompletableFuture<Either<AsyncIterator.End, Either>> curr = this.next;
            this.next = new CompletableFuture();
            curr.complete(Either.right(item));
        }

        @Override
        public void onError(Throwable throwable) {
            this.next.completeExceptionally(throwable);
        }

        @Override
        public void onComplete() {
            this.next.complete(AsyncIterator.End.end());
        }

        public String toString() {
            return super.toString() + (String)(this.subscription == null ? "[not-subscribed]" : "[subscription: " + this.subscription.toString() + "]");
        }
    }

    private static class IteratorBackedPublisher<T>
    implements Flow.Publisher<T> {
        private static final VarHandle SUBSCRIBED_HANDLE;
        private final AsyncIterator<? extends T> asyncIterator;
        private volatile boolean subscribed;

        private IteratorBackedPublisher(AsyncIterator<? extends T> asyncIterator) {
            this.asyncIterator = asyncIterator;
        }

        @Override
        public void subscribe(Flow.Subscriber<? super T> subscriber) {
            if (SUBSCRIBED_HANDLE.getAndSet(this, true)) {
                subscriber.onError(new IllegalStateException("Publisher " + this + " does not support multiple subscribers"));
                return;
            }
            subscriber.onSubscribe(new IteratorBackedSubscription<T>(this.asyncIterator, subscriber));
        }

        public String toString() {
            return super.toString() + "[backed by " + this.asyncIterator.toString() + "]";
        }

        static {
            try {
                MethodHandles.Lookup l = MethodHandles.lookup();
                SUBSCRIBED_HANDLE = l.findVarHandle(IteratorBackedPublisher.class, "subscribed", Boolean.TYPE);
            }
            catch (ReflectiveOperationException e) {
                throw new Error(e);
            }
        }
    }

    private static class SuppliedIteratorBackedPublisher<T>
    implements Flow.Publisher<T> {
        private final Supplier<AsyncIterator<? extends T>> asyncIteratorSupplier;

        private SuppliedIteratorBackedPublisher(Supplier<AsyncIterator<? extends T>> asyncIteratorSupplier) {
            this.asyncIteratorSupplier = asyncIteratorSupplier;
        }

        @Override
        public void subscribe(Flow.Subscriber<? super T> subscriber) {
            subscriber.onSubscribe(new IteratorBackedSubscription<T>(this.asyncIteratorSupplier.get(), subscriber));
        }

        public String toString() {
            return super.hashCode() + "[backed by " + this.asyncIteratorSupplier.toString() + "]";
        }
    }
}

