/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.mutiny.operators.multi;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.helpers.queues.Queues;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.operators.multi.AbstractMultiOperator;
import io.smallrye.mutiny.operators.multi.MultiOperatorProcessor;
import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class MultiWindowOp<T>
extends AbstractMultiOperator<T, Multi<T>> {
    private final int size;
    private final int skip;
    private final Supplier<? extends Queue<T>> processorQueueSupplier;
    private final Supplier<? extends Queue<UnicastProcessor<T>>> overflowQueueSupplier;

    public MultiWindowOp(Multi<? extends T> upstream, int size, int skip) {
        super(upstream);
        this.size = ParameterValidation.positive(size, "size");
        this.skip = ParameterValidation.positive(skip, "skip");
        this.processorQueueSupplier = Queues.unbounded(Queues.BUFFER_XS);
        this.overflowQueueSupplier = Queues.unbounded(Queues.BUFFER_XS);
    }

    @Override
    public void subscribe(MultiSubscriber<? super Multi<T>> downstream) {
        if (this.skip == this.size) {
            this.upstream.subscribe().withSubscriber(new MultiWindowExactProcessor(downstream, this.size, this.processorQueueSupplier));
        } else if (this.skip > this.size) {
            this.upstream.subscribe().withSubscriber(new MultiWindowWithSkipProcessor(downstream, this.size, this.skip, this.processorQueueSupplier));
        } else {
            this.upstream.subscribe().withSubscriber(new MultiWindowWithOverlapProcessor<T>(downstream, this.size, this.skip, this.processorQueueSupplier, this.overflowQueueSupplier.get()));
        }
    }

    static final class MultiWindowWithOverlapProcessor<T>
    extends MultiOperatorProcessor<T, Multi<T>>
    implements Runnable {
        private final ArrayDeque<UnicastProcessor<T>> processors = new ArrayDeque();
        private final Supplier<? extends Queue<T>> supplier;
        private final Queue<UnicastProcessor<T>> overflow;
        private final int size;
        private final int skip;
        private final AtomicReference<Throwable> failure = new AtomicReference();
        private final AtomicInteger count = new AtomicInteger();
        private final AtomicBoolean firstRequest = new AtomicBoolean();
        private final AtomicLong requested = new AtomicLong();
        private final AtomicInteger wip = new AtomicInteger();
        private int index;
        private int produced;

        MultiWindowWithOverlapProcessor(MultiSubscriber<? super Multi<T>> downstream, int size, int skip, Supplier<? extends Queue<T>> supplier, Queue<UnicastProcessor<T>> overflowQueue) {
            super(downstream);
            this.size = size;
            this.skip = skip;
            this.supplier = supplier;
            this.count.lazySet(1);
            this.overflow = overflowQueue;
        }

        @Override
        public void onItem(T t) {
            if (this.isDone()) {
                return;
            }
            int i = this.index;
            if (i == 0 && !this.isCancelled()) {
                this.count.getAndIncrement();
                UnicastProcessor<T> proc = UnicastProcessor.create(this.supplier.get(), this);
                this.processors.offer(proc);
                this.overflow.offer(proc);
                this.drain();
            }
            ++i;
            for (UnicastProcessor<T> proc : this.processors) {
                proc.onNext(t);
            }
            int p = this.produced + 1;
            if (p == this.size) {
                UnicastProcessor<T> proc;
                this.produced = p - this.skip;
                proc = this.processors.poll();
                if (proc != null) {
                    proc.onComplete();
                }
            } else {
                this.produced = p;
            }
            this.index = i == this.skip ? 0 : i;
        }

        @Override
        public void onFailure(Throwable f) {
            Subscription subscription = this.upstream.getAndSet(Subscriptions.CANCELLED);
            if (subscription != Subscriptions.CANCELLED) {
                for (UnicastProcessor<T> proc : this.processors) {
                    proc.onError(f);
                }
                this.processors.clear();
                this.failure.set(f);
                this.drain();
            } else {
                Infrastructure.handleDroppedException(f);
            }
        }

        @Override
        public void onCompletion() {
            Subscription subscription = this.upstream.getAndSet(Subscriptions.CANCELLED);
            if (subscription != Subscriptions.CANCELLED) {
                for (UnicastProcessor<T> proc : this.processors) {
                    proc.onComplete();
                }
                this.processors.clear();
                this.drain();
            }
        }

        void drain() {
            if (this.wip.getAndIncrement() != 0) {
                return;
            }
            MultiSubscriber actual = this.downstream;
            Queue<UnicastProcessor<T>> q = this.overflow;
            int missed = 1;
            do {
                long e;
                long r = this.requested.get();
                for (e = 0L; e != r; ++e) {
                    boolean isEmpty;
                    boolean isDone = this.isDone();
                    UnicastProcessor<T> t = q.poll();
                    boolean bl = isEmpty = t == null;
                    if (this.isCancelledOrDone(isDone, isEmpty, actual, q)) {
                        return;
                    }
                    if (isEmpty) break;
                    actual.onItem(t);
                }
                if (e == r && this.isCancelledOrDone(this.isDone(), q.isEmpty(), actual, q)) {
                    return;
                }
                if (e == 0L || r == Long.MAX_VALUE) continue;
                this.requested.addAndGet(-e);
            } while ((missed = this.wip.addAndGet(-missed)) != 0);
        }

        boolean isCancelledOrDone(boolean isDone, boolean isEmpty, Subscriber<?> subscriber, Queue<?> q) {
            if (this.isCancelled()) {
                q.clear();
                return true;
            }
            if (isDone) {
                Throwable failed = this.failure.get();
                if (failed != null) {
                    q.clear();
                    subscriber.onError(failed);
                    return true;
                }
                if (isEmpty) {
                    subscriber.onComplete();
                    return true;
                }
            }
            return false;
        }

        @Override
        public void request(long n) {
            Subscriptions.add(this.requested, n);
            if (this.firstRequest.compareAndSet(false, true)) {
                long u = Subscriptions.multiply(this.skip, n - 1L);
                long v = Subscriptions.add(this.size, u);
                super.request(v);
            } else {
                long u = Subscriptions.multiply(this.skip, n);
                super.request(u);
            }
            this.drain();
        }

        @Override
        public void cancel() {
            if (this.hasDownstreamCancelled.compareAndSet(false, true)) {
                this.run();
            }
        }

        @Override
        public void run() {
            if (this.count.decrementAndGet() == 0) {
                ((Subscription)this.upstream.get()).cancel();
            }
        }
    }

    static final class MultiWindowWithSkipProcessor<T>
    extends MultiOperatorProcessor<T, Multi<T>> {
        private final Supplier<? extends Queue<T>> supplier;
        private final int size;
        private final int skip;
        private final AtomicInteger count = new AtomicInteger();
        private final AtomicBoolean firstRequest = new AtomicBoolean();
        int index;
        UnicastProcessor<T> processor;

        MultiWindowWithSkipProcessor(MultiSubscriber<? super Multi<T>> downstream, int size, int skip, Supplier<? extends Queue<T>> supplier) {
            super(downstream);
            this.size = size;
            this.skip = skip;
            this.supplier = supplier;
            this.count.lazySet(1);
        }

        @Override
        public void onItem(T t) {
            if (this.isDone()) {
                return;
            }
            int i = this.index;
            UnicastProcessor<T> proc = this.processor;
            if (i == 0) {
                this.count.getAndIncrement();
                proc = UnicastProcessor.create(this.supplier.get(), () -> {
                    if (this.count.decrementAndGet() == 0) {
                        ((Subscription)this.upstream.get()).cancel();
                    }
                });
                this.processor = proc;
                this.downstream.onItem(proc);
            }
            ++i;
            if (proc != null) {
                proc.onNext(t);
            }
            if (i == this.size) {
                this.processor = null;
                if (proc != null) {
                    proc.onComplete();
                }
            }
            this.index = i == this.skip ? 0 : i;
        }

        @Override
        public void onFailure(Throwable failure) {
            Subscription subscription = this.upstream.getAndSet(Subscriptions.CANCELLED);
            if (subscription != Subscriptions.CANCELLED) {
                UnicastProcessor<T> proc = this.processor;
                if (proc != null) {
                    this.processor = null;
                    proc.onError(failure);
                }
                this.downstream.onFailure(failure);
            } else {
                Infrastructure.handleDroppedException(failure);
            }
        }

        @Override
        public void onCompletion() {
            Subscription subscription = this.upstream.getAndSet(Subscriptions.CANCELLED);
            if (subscription != Subscriptions.CANCELLED) {
                UnicastProcessor<T> proc = this.processor;
                if (proc != null) {
                    this.processor = null;
                    proc.onComplete();
                }
                this.downstream.onCompletion();
            }
        }

        @Override
        public void request(long n) {
            if (this.firstRequest.compareAndSet(false, true)) {
                long u = Subscriptions.multiply(this.size, n);
                long v = Subscriptions.multiply((long)this.skip - (long)this.size, n - 1L);
                long w = Subscriptions.add(u, v);
                super.request(w);
            } else {
                long u = Subscriptions.multiply(this.skip, n);
                super.request(u);
            }
        }

        @Override
        public void cancel() {
            if (this.hasDownstreamCancelled.compareAndSet(false, true) && this.count.decrementAndGet() == 0) {
                ((Subscription)this.upstream.get()).cancel();
            }
        }
    }

    static final class MultiWindowExactProcessor<T>
    extends MultiOperatorProcessor<T, Multi<T>> {
        private final Supplier<? extends Queue<T>> supplier;
        private final int size;
        private final AtomicInteger count = new AtomicInteger();
        int index;
        private UnicastProcessor<T> processor;

        MultiWindowExactProcessor(MultiSubscriber<? super Multi<T>> downstream, int size, Supplier<? extends Queue<T>> supplier) {
            super(downstream);
            this.size = size;
            this.supplier = supplier;
            this.count.lazySet(1);
        }

        @Override
        public void onItem(T t) {
            if (this.isDone()) {
                return;
            }
            int i = this.index;
            UnicastProcessor<T> proc = this.processor;
            if (!this.isCancelled() && i == 0) {
                this.count.getAndIncrement();
                proc = UnicastProcessor.create(this.supplier.get(), () -> {
                    if (this.count.decrementAndGet() == 0) {
                        ((Subscription)this.upstream.get()).cancel();
                    }
                });
                this.processor = proc;
                this.downstream.onItem(proc);
            }
            proc.onNext(t);
            if (++i == this.size) {
                this.index = 0;
                this.processor = null;
                proc.onComplete();
            } else {
                this.index = i;
            }
        }

        @Override
        public void onFailure(Throwable failure) {
            Subscription subscription = this.upstream.getAndSet(Subscriptions.CANCELLED);
            if (subscription != Subscriptions.CANCELLED) {
                UnicastProcessor<T> proc = this.processor;
                if (proc != null) {
                    this.processor = null;
                    proc.onError(failure);
                }
                this.downstream.onFailure(failure);
            } else {
                Infrastructure.handleDroppedException(failure);
            }
        }

        @Override
        public void onCompletion() {
            Subscription subscription = this.upstream.getAndSet(Subscriptions.CANCELLED);
            if (subscription != Subscriptions.CANCELLED) {
                UnicastProcessor<T> proc = this.processor;
                if (proc != null) {
                    this.processor = null;
                    proc.onComplete();
                }
                this.downstream.onCompletion();
            }
        }

        @Override
        public void request(long n) {
            long u = Subscriptions.multiply(this.size, n);
            super.request(u);
        }

        @Override
        public void cancel() {
            if (this.hasDownstreamCancelled.compareAndSet(false, true) && this.count.decrementAndGet() == 0) {
                ((Subscription)this.upstream.get()).cancel();
            }
        }
    }
}

