/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.core.utils;

import java.util.Queue;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.publisher.Operators;
import reactor.core.publisher.SignalType;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;

public class SerialFlux<T> {
    static final AtomicReferenceFieldUpdater<SerialFlux, Pending> WIP = AtomicReferenceFieldUpdater.newUpdater(SerialFlux.class, Pending.class, "wip");
    static final AtomicReferenceFieldUpdater<Pending.PendingSubscriber, CoreSubscriber> ACTUAL = AtomicReferenceFieldUpdater.newUpdater(Pending.PendingSubscriber.class, CoreSubscriber.class, "actual");
    final Queue<Pending<T>> queue;
    volatile Pending<T> wip;

    public SerialFlux() {
        this(Queues.small());
    }

    public SerialFlux(Supplier<Queue<Pending<T>>> queueSupplier) {
        this.queue = queueSupplier.get();
    }

    public int size() {
        return this.queue.size();
    }

    public Flux<T> join(Flux<T> join) {
        Pending<T> pending = new Pending<T>(this, join);
        if (!this.queue.offer(pending)) {
            return Flux.error((Throwable)new IllegalStateException("pending queue is full"));
        }
        return pending;
    }

    void drain() {
        Pending<T> pending;
        if (this.wip != null) {
            return;
        }
        do {
            if (!WIP.compareAndSet(this, null, pending = this.queue.poll())) continue;
            if (pending != null) {
                pending.doSubscribe();
            }
            return;
        } while (pending == null || this.queue.offer(pending));
        pending.doSubscribe();
    }

    static class Pending<T>
    extends FluxOperator<T, T> {
        private final SerialFlux<T> main;
        private final PendingSubscriber subscriber = new PendingSubscriber();

        protected Pending(SerialFlux<T> main, Flux<? extends T> source) {
            super(source);
            this.main = main;
        }

        public Object scanUnsafe(@Nonnull Scannable.Attr key) {
            if (key == Scannable.Attr.TERMINATED) {
                return this.subscriber.isDisposed();
            }
            return super.scanUnsafe(key);
        }

        public void subscribe(@Nonnull CoreSubscriber<? super T> actual) {
            this.subscriber.addSubscriber(actual);
            this.main.drain();
        }

        void doSubscribe() {
            if (this.subscriber.isCompleted()) {
                WIP.compareAndSet(this.main, this, null);
                this.main.drain();
                return;
            }
            this.source.subscribe((CoreSubscriber)this.subscriber);
        }

        class PendingSubscriber
        extends BaseSubscriber<T> {
            volatile CoreSubscriber<? super T> actual = null;
            long requested = Long.MAX_VALUE;

            PendingSubscriber() {
            }

            protected void hookFinally(@Nonnull SignalType type) {
                this.complete();
            }

            private boolean isCompleted() {
                return ACTUAL.get(this) == this;
            }

            private void complete() {
                ACTUAL.set(this, (CoreSubscriber)this);
                if (WIP.compareAndSet(Pending.this.main, Pending.this, null)) {
                    Pending.this.main.drain();
                }
            }

            @Nonnull
            public Context currentContext() {
                return this.actual == null || this.isCompleted() ? super.currentContext() : this.actual.currentContext();
            }

            protected void hookOnNext(@Nonnull T value) {
                CoreSubscriber actual = this.actual;
                if (this.isCompleted() || actual == null) {
                    Operators.onDiscard(value, (Context)this.currentContext());
                } else {
                    actual.onNext(value);
                }
            }

            protected void hookOnComplete() {
                if (this.isCompleted()) {
                    return;
                }
                CoreSubscriber actual = this.actual;
                if (actual != null) {
                    actual.onComplete();
                }
            }

            protected void hookOnError(@Nonnull Throwable throwable) {
                if (this.isCompleted()) {
                    return;
                }
                CoreSubscriber actual = this.actual;
                if (actual != null) {
                    actual.onError(throwable);
                }
            }

            protected void hookOnSubscribe(@Nonnull Subscription subscription) {
                subscription.request(this.requested);
            }

            private synchronized void addSubscriber(CoreSubscriber<? super T> subscriber) {
                if (!ACTUAL.compareAndSet(this, null, subscriber)) {
                    Operators.error(subscriber, (Throwable)new IllegalStateException("SerialFlux allows only a single Subscriber"));
                    return;
                }
                subscriber.onSubscribe(new Subscription(){

                    public void request(long n) {
                        PendingSubscriber.this.requested = n;
                    }

                    public void cancel() {
                        if (PendingSubscriber.this.upstream() == null) {
                            PendingSubscriber.this.dispose();
                            PendingSubscriber.this.complete();
                        } else {
                            PendingSubscriber.this.upstream().cancel();
                            PendingSubscriber.this.complete();
                        }
                    }
                });
            }
        }
    }
}

