/*
 * Decompiled with CFR 0.152.
 */
package reactor.core.processor.util;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Dispatcher;
import reactor.core.processor.InsufficientCapacityException;
import reactor.core.processor.MutableSignal;
import reactor.core.support.Exceptions;
import reactor.core.support.NonBlocking;
import reactor.core.support.SpecificationExceptions;
import reactor.jarjar.com.lmax.disruptor.AlertException;
import reactor.jarjar.com.lmax.disruptor.RingBuffer;
import reactor.jarjar.com.lmax.disruptor.Sequence;
import reactor.jarjar.com.lmax.disruptor.SequenceBarrier;
import reactor.jarjar.com.lmax.disruptor.TimeoutException;

public final class RingBufferSubscriberUtils {
    private RingBufferSubscriberUtils() {
    }

    public static <E> void onNext(E value, RingBuffer<MutableSignal<E>> ringBuffer) {
        if (value == null) {
            throw SpecificationExceptions.spec_2_13_exception();
        }
        long seqId = ringBuffer.next();
        MutableSignal<E> signal = ringBuffer.get(seqId);
        signal.type = MutableSignal.Type.NEXT;
        signal.value = value;
        ringBuffer.publish(seqId);
    }

    public static <E> void onError(Throwable error, RingBuffer<MutableSignal<E>> ringBuffer) {
        if (error == null) {
            throw SpecificationExceptions.spec_2_13_exception();
        }
        long seqId = ringBuffer.next();
        MutableSignal<E> signal = ringBuffer.get(seqId);
        signal.type = MutableSignal.Type.ERROR;
        signal.value = null;
        signal.error = error;
        ringBuffer.publish(seqId);
    }

    public static <E> void onComplete(RingBuffer<MutableSignal<E>> ringBuffer) {
        long seqId = ringBuffer.next();
        MutableSignal<E> signal = ringBuffer.get(seqId);
        signal.type = MutableSignal.Type.COMPLETE;
        signal.value = null;
        signal.error = null;
        ringBuffer.publish(seqId);
    }

    public static <E> void route(MutableSignal<E> task, Subscriber<? super E> subscriber) {
        if (task.type == MutableSignal.Type.NEXT && null != task.value) {
            subscriber.onNext(task.value);
        } else if (task.type == MutableSignal.Type.COMPLETE) {
            subscriber.onComplete();
        } else if (task.type == MutableSignal.Type.ERROR) {
            subscriber.onError(task.error);
        }
    }

    public static <E> void routeOnce(MutableSignal<E> task, Subscriber<? super E> subscriber) {
        Object value = task.value;
        task.value = null;
        try {
            if (task.type == MutableSignal.Type.NEXT && null != value) {
                subscriber.onNext(value);
            } else if (task.type == MutableSignal.Type.COMPLETE) {
                subscriber.onComplete();
            } else if (task.type == MutableSignal.Type.ERROR) {
                subscriber.onError(task.error);
            }
        }
        catch (Throwable t) {
            task.value = value;
            throw t;
        }
    }

    public static <T> boolean waitRequestOrTerminalEvent(Sequence pendingRequest, RingBuffer<MutableSignal<T>> ringBuffer, SequenceBarrier barrier, Subscriber<? super T> subscriber, AtomicBoolean isRunning) {
        long waitedSequence = ringBuffer.getCursor() + 1L;
        try {
            MutableSignal<T> event = null;
            while (pendingRequest.get() < 0L) {
                if (event == null) {
                    barrier.waitFor(waitedSequence);
                    event = ringBuffer.get(waitedSequence);
                    if (event.type == MutableSignal.Type.COMPLETE) {
                        try {
                            subscriber.onComplete();
                            return false;
                        }
                        catch (Throwable t) {
                            Exceptions.throwIfFatal(t);
                            subscriber.onError(t);
                            return false;
                        }
                    }
                    if (event.type == MutableSignal.Type.ERROR) {
                        subscriber.onError(event.error);
                        return false;
                    }
                } else {
                    barrier.checkAlert();
                }
                LockSupport.parkNanos(1L);
            }
        }
        catch (TimeoutException event) {
        }
        catch (AlertException ae) {
            if (!isRunning.get()) {
                return false;
            }
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
        return true;
    }

    public static <E> Publisher<Void> writeWith(Publisher<? extends E> source, RingBuffer<MutableSignal<E>> ringBuffer) {
        NonBlocking nonBlockingSource = NonBlocking.class.isAssignableFrom(source.getClass()) ? (NonBlocking)((Object)source) : null;
        int capacity = nonBlockingSource != null ? (int)Math.min(nonBlockingSource.getCapacity(), (long)ringBuffer.getBufferSize()) : ringBuffer.getBufferSize();
        return new WriteWithPublisher<E>(source, ringBuffer, capacity);
    }

    private static class WriteWithPublisher<E>
    implements Publisher<Void> {
        private final Publisher<? extends E> source;
        private final RingBuffer<MutableSignal<E>> ringBuffer;
        private final int capacity;

        public WriteWithPublisher(Publisher<? extends E> source, RingBuffer<MutableSignal<E>> ringBuffer, int capacity) {
            this.source = source;
            this.ringBuffer = ringBuffer;
            this.capacity = capacity;
        }

        @Override
        public void subscribe(Subscriber<? super Void> s) {
            this.source.subscribe(new WriteWithSubscriber(s));
        }

        private class WriteWithSubscriber
        implements Subscriber<E>,
        NonBlocking {
            private final Sequence pendingRequest = new Sequence(0L);
            private final Subscriber<? super Void> s;
            Subscription subscription;
            long index;

            public WriteWithSubscriber(Subscriber<? super Void> s) {
                this.s = s;
                this.index = 0L;
            }

            void doRequest(int n) {
                Subscription s = this.subscription;
                if (s != null) {
                    this.pendingRequest.addAndGet(n);
                    this.index = WriteWithPublisher.this.ringBuffer.next(n);
                    s.request(n);
                }
            }

            @Override
            public void onSubscribe(Subscription s) {
                this.subscription = s;
                this.doRequest(WriteWithPublisher.this.capacity);
            }

            @Override
            public void onNext(E e) {
                long previous = this.pendingRequest.addAndGet(-1L);
                if (previous >= 0L) {
                    MutableSignal signal = (MutableSignal)WriteWithPublisher.this.ringBuffer.get(this.index + ((long)WriteWithPublisher.this.capacity - previous));
                    signal.type = MutableSignal.Type.NEXT;
                    signal.value = e;
                    if (previous == 0L && this.subscription != null) {
                        WriteWithPublisher.this.ringBuffer.publish(this.index - (this.index + (long)WriteWithPublisher.this.capacity - 1L), this.index);
                        this.doRequest(WriteWithPublisher.this.capacity);
                    }
                } else {
                    throw InsufficientCapacityException.get();
                }
            }

            @Override
            public void onError(Throwable t) {
                this.s.onError(t);
            }

            @Override
            public void onComplete() {
                long previous = this.pendingRequest.get();
                WriteWithPublisher.this.ringBuffer.publish(this.index - (this.index + (long)WriteWithPublisher.this.capacity - 1L), this.index - ((long)WriteWithPublisher.this.capacity - previous));
                this.s.onComplete();
            }

            @Override
            public boolean isReactivePull(Dispatcher dispatcher, long producerCapacity) {
                return false;
            }

            @Override
            public long getCapacity() {
                return WriteWithPublisher.this.capacity;
            }
        }
    }
}

