/*
 * Decompiled with CFR 0.152.
 */
package io.opentelemetry.testing.internal.armeria.common.stream;

import io.opentelemetry.testing.internal.armeria.common.annotation.Nullable;
import io.opentelemetry.testing.internal.armeria.common.stream.CancelledSubscriptionException;
import io.opentelemetry.testing.internal.armeria.common.stream.ClosedStreamException;
import io.opentelemetry.testing.internal.armeria.common.stream.StreamMessage;
import io.opentelemetry.testing.internal.armeria.common.stream.SubscriptionOption;
import io.opentelemetry.testing.internal.armeria.common.util.EventLoopCheckingFuture;
import io.opentelemetry.testing.internal.armeria.internal.common.stream.InternalStreamMessageUtil;
import io.opentelemetry.testing.internal.armeria.internal.common.stream.StreamMessageUtil;
import io.opentelemetry.testing.internal.armeria.internal.shaded.guava.collect.ImmutableList;
import io.opentelemetry.testing.internal.armeria.internal.shaded.guava.collect.ImmutableSet;
import io.opentelemetry.testing.internal.armeria.internal.shaded.guava.math.LongMath;
import io.opentelemetry.testing.internal.io.netty.util.concurrent.EventExecutor;
import java.util.ArrayDeque;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

final class FlatMapStreamMessage<T, U>
implements StreamMessage<U> {
    private final StreamMessage<T> source;
    private final Function<T, StreamMessage<U>> function;
    private final int maxConcurrency;
    private final CompletableFuture<Void> completionFuture;
    @Nullable
    private FlatMapAggregatingSubscriber<T, U> innerSubscriber;

    FlatMapStreamMessage(StreamMessage<? extends T> source, Function<? super T, ? extends StreamMessage<? extends U>> function, int maxConcurrency) {
        Objects.requireNonNull(source, "source");
        Objects.requireNonNull(function, "function");
        this.source = source;
        this.function = function;
        this.maxConcurrency = maxConcurrency;
        this.completionFuture = new EventLoopCheckingFuture<Void>();
    }

    @Override
    public boolean isOpen() {
        return !this.completionFuture.isDone();
    }

    @Override
    public boolean isEmpty() {
        return !this.isOpen() && this.innerSubscriber != null && !((FlatMapAggregatingSubscriber)this.innerSubscriber).publishedAny;
    }

    @Override
    public long demand() {
        if (this.innerSubscriber == null) {
            return 0L;
        }
        return ((FlatMapAggregatingSubscriber)this.innerSubscriber).requestedByDownstream;
    }

    @Override
    public CompletableFuture<Void> whenComplete() {
        return this.completionFuture;
    }

    @Override
    public void subscribe(Subscriber<? super U> subscriber, EventExecutor executor, SubscriptionOption ... options) {
        Objects.requireNonNull(subscriber, "subscriber");
        Objects.requireNonNull(executor, "executor");
        Objects.requireNonNull(options, "options");
        this.innerSubscriber = new FlatMapAggregatingSubscriber<T, U>(subscriber, this.function, executor, this.maxConcurrency, this.completionFuture, new SubscriptionOption[0]);
        this.source.subscribe(this.innerSubscriber, executor, options);
    }

    @Override
    public void abort() {
        this.source.abort();
    }

    @Override
    public void abort(Throwable cause) {
        Objects.requireNonNull(cause, "cause");
        this.source.abort(cause);
    }

    private static final class FlatMapAggregatingSubscriber<T, U>
    implements Subscriber<T>,
    Subscription {
        private final int maxConcurrency;
        private final Subscriber<? super U> downstream;
        private final Function<T, StreamMessage<U>> function;
        private final EventExecutor executor;
        private final Set<FlatMapSubscriber<T, U>> childSubscribers;
        private final Queue<U> buffer;
        private final CompletableFuture<Void> completionFuture;
        private final SubscriptionOption[] options;
        private final boolean notifyCancellation;
        @Nullable
        private volatile Subscription upstream;
        private long requestedByDownstream;
        private boolean closed;
        private boolean completing;
        private boolean initialized;
        private boolean publishedAny;

        FlatMapAggregatingSubscriber(Subscriber<? super U> downstream, Function<T, StreamMessage<U>> function, EventExecutor executor, int maxConcurrency, CompletableFuture<Void> completionFuture, SubscriptionOption ... options) {
            Objects.requireNonNull(downstream, "downstream");
            Objects.requireNonNull(function, "function");
            Objects.requireNonNull(executor, "executor");
            Objects.requireNonNull(completionFuture, "completionFuture");
            this.downstream = downstream;
            this.function = function;
            this.executor = executor;
            this.maxConcurrency = maxConcurrency;
            this.completionFuture = completionFuture;
            this.options = options;
            this.notifyCancellation = InternalStreamMessageUtil.containsNotifyCancellation(options);
            this.childSubscribers = new HashSet<FlatMapSubscriber<T, U>>();
            this.buffer = new ArrayDeque<U>();
        }

        @Override
        public void onSubscribe(Subscription subscription) {
            Objects.requireNonNull(subscription, "subscription");
            this.upstream = subscription;
            this.downstream.onSubscribe(this);
        }

        @Override
        public void onNext(T item) {
            Objects.requireNonNull(item, "item");
            if (this.closed) {
                StreamMessageUtil.closeOrAbort(item);
                return;
            }
            StreamMessage<U> newStreamMessage = this.function.apply(item);
            FlatMapSubscriber childSubscriber = new FlatMapSubscriber(this);
            this.childSubscribers.add(childSubscriber);
            newStreamMessage.subscribe(childSubscriber, this.executor, this.options);
        }

        @Override
        public void onError(Throwable cause) {
            Objects.requireNonNull(cause, "cause");
            if (this.closed) {
                return;
            }
            this.closed = true;
            this.cancelChildSubscribersAndBuffer(cause);
            this.downstream.onError(cause);
            this.completionFuture.completeExceptionally(cause);
        }

        @Override
        public void onComplete() {
            if (this.closed) {
                return;
            }
            this.completing = true;
            if (this.canComplete()) {
                this.complete();
            }
        }

        private void complete() {
            this.downstream.onComplete();
            this.completionFuture.complete(null);
            this.closed = true;
        }

        @Override
        public void request(long n) {
            if (n <= 0L) {
                this.onError(new IllegalArgumentException("n: " + n + " (expected: > 0, see Reactive Streams specification rule 3.9)"));
                this.cancel();
                return;
            }
            if (this.executor.inEventLoop()) {
                this.handleRequest(n);
            } else {
                this.executor.execute(() -> this.handleRequest(n));
            }
        }

        private void handleRequest(long n) {
            if (this.closed) {
                return;
            }
            this.requestedByDownstream = LongMath.saturatedAdd(this.requestedByDownstream, n);
            this.flush();
            if (!this.initialized) {
                this.initialized = true;
                Subscription upstream = this.upstream;
                assert (upstream != null);
                upstream.request(this.maxConcurrency);
            }
            this.requestAllAvailable();
        }

        @Override
        public void cancel() {
            if (this.executor.inEventLoop()) {
                this.cancel0();
            } else {
                this.executor.execute(this::cancel0);
            }
        }

        private void cancel0() {
            if (this.closed) {
                return;
            }
            this.closed = true;
            Subscription upstream = this.upstream;
            assert (upstream != null);
            upstream.cancel();
            CancelledSubscriptionException cause = CancelledSubscriptionException.get();
            this.cancelChildSubscribersAndBuffer(cause);
            this.completionFuture.completeExceptionally(cause);
            if (this.notifyCancellation) {
                this.downstream.onError(cause);
            }
        }

        private void cancelChildSubscribersAndBuffer(Throwable cause) {
            for (Object item : this.buffer) {
                StreamMessageUtil.closeOrAbort(item, cause);
            }
            this.buffer.clear();
            ImmutableSet.copyOf(this.childSubscribers).forEach(FlatMapSubscriber::cancel);
        }

        private long getAvailableBufferSpace() {
            if (this.requestedByDownstream == Long.MAX_VALUE) {
                return Long.MAX_VALUE;
            }
            long requested = this.childSubscribers.stream().map(FlatMapSubscriber::getRequested).reduce(0L, LongMath::saturatedAdd);
            return (long)this.maxConcurrency - requested - (long)this.buffer.size();
        }

        private void requestAllAvailable() {
            if (this.childSubscribers.isEmpty()) {
                return;
            }
            long available = this.getAvailableBufferSpace();
            if (available == Long.MAX_VALUE) {
                this.childSubscribers.stream().filter(FlatMapSubscriber::subscribed).collect(ImmutableList.toImmutableList()).forEach(sub -> sub.request(Long.MAX_VALUE));
                return;
            }
            List toRequest = this.childSubscribers.stream().filter(FlatMapSubscriber::subscribed).filter(sub -> sub.getRequested() == 0L).limit(available).collect(ImmutableList.toImmutableList());
            toRequest.forEach(sub -> sub.request(1L));
        }

        private void flush() {
            while (this.requestedByDownstream > 0L && !this.buffer.isEmpty()) {
                U value = this.buffer.remove();
                this.publishDownstream(value);
            }
            if (this.canComplete()) {
                this.complete();
            }
        }

        void completeChild(FlatMapSubscriber<T, U> child) {
            this.childSubscribers.remove(child);
            if (this.canComplete()) {
                this.complete();
                return;
            }
            if (!this.closed && !this.completing) {
                Subscription upstream = this.upstream;
                assert (upstream != null);
                upstream.request(1L);
            }
        }

        private boolean canComplete() {
            return this.completing && this.childSubscribers.isEmpty() && this.buffer.isEmpty();
        }

        void onNextChild(U value) {
            Objects.requireNonNull(value, "value");
            if (this.requestedByDownstream > 0L) {
                this.publishDownstream(value);
            } else {
                this.buffer.add(value);
            }
            this.requestAllAvailable();
        }

        private void publishDownstream(U item) {
            if (this.closed) {
                StreamMessageUtil.closeOrAbort(item, ClosedStreamException.get());
                return;
            }
            this.publishedAny = true;
            if (this.requestedByDownstream != Long.MAX_VALUE) {
                --this.requestedByDownstream;
            }
            this.downstream.onNext(item);
        }
    }

    private static final class FlatMapSubscriber<T, U>
    implements Subscriber<U> {
        private final FlatMapAggregatingSubscriber<T, U> parent;
        private long requested;
        private boolean canceled;
        @Nullable
        private Subscription subscription;

        FlatMapSubscriber(FlatMapAggregatingSubscriber<T, U> parent) {
            Objects.requireNonNull(parent, "parent");
            this.parent = parent;
            this.requested = 0L;
        }

        @Override
        public void onSubscribe(Subscription subscription) {
            Objects.requireNonNull(subscription, "subscription");
            if (this.canceled) {
                subscription.cancel();
                return;
            }
            this.subscription = subscription;
            ((FlatMapAggregatingSubscriber)this.parent).requestAllAvailable();
        }

        @Override
        public void onNext(U value) {
            if (this.requested != Long.MAX_VALUE) {
                --this.requested;
            }
            if (this.canceled) {
                StreamMessageUtil.closeOrAbort(value);
                return;
            }
            this.parent.onNextChild(value);
        }

        @Override
        public void onError(Throwable cause) {
            Objects.requireNonNull(cause, "cause");
            if (this.canceled) {
                return;
            }
            this.canceled = true;
            this.parent.onError(cause);
        }

        @Override
        public void onComplete() {
            this.parent.completeChild(this);
        }

        public void request(long n) {
            assert (this.subscription != null);
            this.requested = LongMath.saturatedAdd(this.requested, n);
            this.subscription.request(n);
        }

        public void cancel() {
            if (this.canceled) {
                return;
            }
            this.canceled = true;
            if (this.subscription != null) {
                this.subscription.cancel();
            }
        }

        public long getRequested() {
            return this.requested;
        }

        boolean subscribed() {
            return this.subscription != null;
        }
    }
}

