/*
 * Decompiled with CFR 0.152.
 */
package reactor.core.publisher;

import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.stream.Stream;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.InnerOperator;
import reactor.core.publisher.InternalFluxOperator;
import reactor.core.publisher.OperatorDisposables;
import reactor.core.publisher.Operators;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.util.concurrent.Queues;

final class FluxWindowTimeout<T>
extends InternalFluxOperator<T, Flux<T>> {
    final int maxSize;
    final long timespan;
    final TimeUnit unit;
    final Scheduler timer;

    FluxWindowTimeout(Flux<T> source, int maxSize, long timespan, TimeUnit unit, Scheduler timer) {
        super(source);
        if (timespan <= 0L) {
            throw new IllegalArgumentException("Timeout period must be strictly positive");
        }
        if (maxSize <= 0) {
            throw new IllegalArgumentException("maxSize must be strictly positive");
        }
        this.timer = Objects.requireNonNull(timer, "Timer");
        this.timespan = timespan;
        this.unit = Objects.requireNonNull(unit, "unit");
        this.maxSize = maxSize;
    }

    @Override
    public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super Flux<T>> actual) {
        return new WindowTimeoutSubscriber(actual, this.maxSize, this.timespan, this.unit, this.timer);
    }

    @Override
    public Object scanUnsafe(Scannable.Attr key) {
        if (key == Scannable.Attr.RUN_ON) {
            return this.timer;
        }
        if (key == Scannable.Attr.RUN_STYLE) {
            return Scannable.Attr.RunStyle.ASYNC;
        }
        return super.scanUnsafe(key);
    }

    static final class WindowTimeoutSubscriber<T>
    implements InnerOperator<T, Flux<T>> {
        final CoreSubscriber<? super Flux<T>> actual;
        final long timespan;
        final TimeUnit unit;
        final Scheduler scheduler;
        final int maxSize;
        final Scheduler.Worker worker;
        final Queue<Object> queue;
        Throwable error;
        volatile boolean done;
        volatile boolean cancelled;
        volatile long requested;
        static final AtomicLongFieldUpdater<WindowTimeoutSubscriber> REQUESTED = AtomicLongFieldUpdater.newUpdater(WindowTimeoutSubscriber.class, "requested");
        volatile int wip;
        static final AtomicIntegerFieldUpdater<WindowTimeoutSubscriber> WIP = AtomicIntegerFieldUpdater.newUpdater(WindowTimeoutSubscriber.class, "wip");
        int count;
        long producerIndex;
        Subscription s;
        Sinks.Many<T> window;
        volatile boolean terminated;
        volatile Disposable timer;
        static final AtomicReferenceFieldUpdater<WindowTimeoutSubscriber, Disposable> TIMER = AtomicReferenceFieldUpdater.newUpdater(WindowTimeoutSubscriber.class, Disposable.class, "timer");

        WindowTimeoutSubscriber(CoreSubscriber<? super Flux<T>> actual, int maxSize, long timespan, TimeUnit unit, Scheduler scheduler) {
            this.actual = actual;
            this.queue = Queues.unboundedMultiproducer().get();
            this.timespan = timespan;
            this.unit = unit;
            this.scheduler = scheduler;
            this.maxSize = maxSize;
            this.worker = scheduler.createWorker();
        }

        @Override
        public CoreSubscriber<? super Flux<T>> actual() {
            return this.actual;
        }

        @Override
        public Stream<? extends Scannable> inners() {
            Sinks.Many<T> w = this.window;
            return w == null ? Stream.empty() : Stream.of(Scannable.from(w));
        }

        @Override
        public Object scanUnsafe(Scannable.Attr key) {
            if (key == Scannable.Attr.PARENT) {
                return this.s;
            }
            if (key == Scannable.Attr.CANCELLED) {
                return this.cancelled;
            }
            if (key == Scannable.Attr.TERMINATED) {
                return this.done;
            }
            if (key == Scannable.Attr.REQUESTED_FROM_DOWNSTREAM) {
                return this.requested;
            }
            if (key == Scannable.Attr.CAPACITY) {
                return this.maxSize;
            }
            if (key == Scannable.Attr.BUFFERED) {
                return this.queue.size();
            }
            if (key == Scannable.Attr.RUN_ON) {
                return this.worker;
            }
            if (key == Scannable.Attr.RUN_STYLE) {
                return Scannable.Attr.RunStyle.ASYNC;
            }
            return InnerOperator.super.scanUnsafe(key);
        }

        @Override
        public void onSubscribe(Subscription s) {
            if (Operators.validate(this.s, s)) {
                this.s = s;
                CoreSubscriber a = this.actual;
                a.onSubscribe(this);
                if (this.cancelled) {
                    return;
                }
                Sinks.Many w = Sinks.unsafe().many().unicast().onBackpressureBuffer();
                this.window = w;
                long r = this.requested;
                if (r != 0L) {
                    a.onNext(w.asFlux());
                    if (r != Long.MAX_VALUE) {
                        REQUESTED.decrementAndGet(this);
                    }
                } else {
                    a.onError(Operators.onOperatorError(s, Exceptions.failWithOverflow(), this.actual.currentContext()));
                    return;
                }
                if (OperatorDisposables.replace(TIMER, this, this.newPeriod())) {
                    s.request(Long.MAX_VALUE);
                }
            }
        }

        Disposable newPeriod() {
            try {
                return this.worker.schedulePeriodically(new ConsumerIndexHolder(this.producerIndex, this), this.timespan, this.timespan, this.unit);
            }
            catch (Exception e) {
                this.actual.onError(Operators.onRejectedExecution(e, this.s, null, null, this.actual.currentContext()));
                return Disposables.disposed();
            }
        }

        /*
         * Enabled aggressive block sorting
         */
        @Override
        public void onNext(T t) {
            if (this.terminated) {
                return;
            }
            if (WIP.get(this) == 0 && WIP.compareAndSet(this, 0, 1)) {
                Sinks.Many<T> w = this.window;
                w.emitNext(t, Sinks.EmitFailureHandler.FAIL_FAST);
                int c = this.count + 1;
                if (c >= this.maxSize) {
                    ++this.producerIndex;
                    this.count = 0;
                    w.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
                    long r = this.requested;
                    if (r == 0L) {
                        this.window = null;
                        this.actual.onError(Operators.onOperatorError(this.s, Exceptions.failWithOverflow(), t, this.actual.currentContext()));
                        this.timer.dispose();
                        this.worker.dispose();
                        return;
                    }
                    w = Sinks.unsafe().many().unicast().onBackpressureBuffer();
                    this.window = w;
                    this.actual.onNext(w.asFlux());
                    if (r != Long.MAX_VALUE) {
                        REQUESTED.decrementAndGet(this);
                    }
                    Disposable tm = this.timer;
                    tm.dispose();
                    Disposable task = this.newPeriod();
                    if (!TIMER.compareAndSet(this, tm, task)) {
                        task.dispose();
                    }
                } else {
                    this.count = c;
                }
                if (WIP.decrementAndGet(this) == 0) {
                    return;
                }
            } else {
                this.queue.offer(t);
                if (!this.enter()) {
                    return;
                }
            }
            this.drainLoop();
        }

        @Override
        public void onError(Throwable t) {
            this.error = t;
            this.done = true;
            if (this.enter()) {
                this.drainLoop();
            }
            this.actual.onError(t);
            this.timer.dispose();
            this.worker.dispose();
        }

        @Override
        public void onComplete() {
            this.done = true;
            if (this.enter()) {
                this.drainLoop();
            }
            this.actual.onComplete();
            this.timer.dispose();
            this.worker.dispose();
        }

        @Override
        public void request(long n) {
            if (Operators.validate(n)) {
                Operators.addCap(REQUESTED, this, n);
            }
        }

        @Override
        public void cancel() {
            this.cancelled = true;
        }

        void drainLoop() {
            Queue<Object> q = this.queue;
            CoreSubscriber<Flux<Flux<T>>> a = this.actual;
            Sinks.Many<Object> w = this.window;
            int missed = 1;
            while (true) {
                if (this.terminated) {
                    this.s.cancel();
                    q.clear();
                    this.timer.dispose();
                    this.worker.dispose();
                    return;
                }
                boolean d = this.done;
                Object o = q.poll();
                boolean empty = o == null;
                boolean isHolder = o instanceof ConsumerIndexHolder;
                if (d && (empty || isHolder)) {
                    this.window = null;
                    q.clear();
                    Throwable err = this.error;
                    if (err != null) {
                        w.emitError(err, Sinks.EmitFailureHandler.FAIL_FAST);
                    } else {
                        w.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
                    }
                    this.timer.dispose();
                    this.worker.dispose();
                    return;
                }
                if (!empty) {
                    if (isHolder) {
                        w.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
                        this.count = 0;
                        w = Sinks.unsafe().many().unicast().onBackpressureBuffer();
                        this.window = w;
                        long r = this.requested;
                        if (r != 0L) {
                            a.onNext(w.asFlux());
                            if (r == Long.MAX_VALUE) continue;
                            REQUESTED.decrementAndGet(this);
                            continue;
                        }
                        this.window = null;
                        this.queue.clear();
                        a.onError(Operators.onOperatorError(this.s, Exceptions.failWithOverflow(), this.actual.currentContext()));
                        this.timer.dispose();
                        this.worker.dispose();
                        return;
                    }
                    w.emitNext(o, Sinks.EmitFailureHandler.FAIL_FAST);
                    int c = this.count + 1;
                    if (c >= this.maxSize) {
                        ++this.producerIndex;
                        this.count = 0;
                        w.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
                        long r = this.requested;
                        if (r != 0L) {
                            w = Sinks.unsafe().many().unicast().onBackpressureBuffer();
                            this.window = w;
                            this.actual.onNext(w.asFlux());
                            if (r != Long.MAX_VALUE) {
                                REQUESTED.decrementAndGet(this);
                            }
                            Disposable tm = this.timer;
                            tm.dispose();
                            Disposable task = this.newPeriod();
                            if (TIMER.compareAndSet(this, tm, task)) continue;
                            task.dispose();
                            continue;
                        }
                        this.window = null;
                        a.onError(Operators.onOperatorError(this.s, Exceptions.failWithOverflow(), o, this.actual.currentContext()));
                        this.timer.dispose();
                        this.worker.dispose();
                        return;
                    }
                    this.count = c;
                    continue;
                }
                if ((missed = WIP.addAndGet(this, -missed)) == 0) break;
            }
        }

        boolean enter() {
            return WIP.getAndIncrement(this) == 0;
        }

        static final class ConsumerIndexHolder
        implements Runnable {
            final long index;
            final WindowTimeoutSubscriber<?> parent;

            ConsumerIndexHolder(long index, WindowTimeoutSubscriber<?> parent) {
                this.index = index;
                this.parent = parent;
            }

            @Override
            public void run() {
                WindowTimeoutSubscriber<?> p = this.parent;
                if (!p.cancelled) {
                    p.queue.offer(this);
                } else {
                    p.terminated = true;
                    p.timer.dispose();
                    p.worker.dispose();
                }
                if (p.enter()) {
                    p.drainLoop();
                }
            }
        }
    }
}

