/*
 * Decompiled with CFR 0.152.
 */
package com.aol.cyclops.types.stream.reactive;

import com.aol.cyclops.control.Eval;
import com.aol.cyclops.control.ReactiveSeq;
import com.aol.cyclops.data.async.Queue;
import com.aol.cyclops.data.async.QueueFactory;
import com.aol.cyclops.data.collections.extensions.standard.QueueX;
import com.aol.cyclops.types.futurestream.Continuation;
import com.aol.cyclops.types.futurestream.LazyFutureStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class QueueBasedSubscriber<T>
implements Subscriber<T> {
    private final int maxConcurrency;
    private final QueueFactory<T> factory;
    protected volatile Queue<T> queue;
    volatile Subscription subscription;
    private volatile LazyFutureStream<T> stream;
    private volatile Supplier<LazyFutureStream<T>> futureStream = Eval.later(this::genStream);
    private volatile Supplier<Stream<T>> jdkStream = Eval.later(this::genJdkStream);
    private volatile Supplier<ReactiveSeq<T>> reactiveSeq = Eval.later(() -> ReactiveSeq.fromStream(this.jdkStream.get()));
    private volatile Consumer<Throwable> errorHandler;
    private final Counter counter;

    public static <T> QueueBasedSubscriber<T> subscriber(Counter counter, int maxConcurrency) {
        return new QueueBasedSubscriber<T>(counter, maxConcurrency);
    }

    public static <T> QueueBasedSubscriber<T> subscriber(Queue<T> q, Counter counter, int maxConcurrency) {
        return new QueueBasedSubscriber<T>(q, counter, maxConcurrency);
    }

    public static <T> QueueBasedSubscriber<T> subscriber(QueueFactory<T> factory2, Counter counter, int maxConcurrency) {
        return new QueueBasedSubscriber<T>(factory2, counter, maxConcurrency);
    }

    private Stream<T> genJdkStream() {
        com.aol.cyclops.react.async.subscription.Subscription subscription = new com.aol.cyclops.react.async.subscription.Subscription();
        return this.queue.stream(subscription);
    }

    private LazyFutureStream<T> genStream() {
        com.aol.cyclops.react.async.subscription.Subscription subscription = new com.aol.cyclops.react.async.subscription.Subscription();
        return LazyFutureStream.of(new Object[0]).withSubscription(subscription).fromStream(this.queue.stream(subscription));
    }

    public QueueBasedSubscriber(final Counter counter, int maxConcurrency) {
        this.maxConcurrency = maxConcurrency;
        this.factory = null;
        this.counter = counter;
        this.queue = new Queue<T>(){

            @Override
            public T get() {
                counter.subscription.forEach(s -> s.request(1L));
                return super.get();
            }
        };
    }

    private QueueBasedSubscriber(Queue<T> q, Counter counter, int maxConcurrency) {
        this.factory = null;
        this.maxConcurrency = maxConcurrency;
        this.counter = counter;
        this.queue = q;
    }

    private QueueBasedSubscriber(QueueFactory<T> factory2, final Counter counter, int maxConcurrency) {
        this.counter = counter;
        this.factory = factory2;
        this.maxConcurrency = maxConcurrency;
        this.queue = new Queue<T>(factory2){

            @Override
            public T get() {
                counter.subscription.forEach(s -> s.request(1L));
                return super.get();
            }
        };
    }

    public LazyFutureStream<T> futureStream() {
        this.stream = this.futureStream.get();
        return this.stream;
    }

    public Stream<T> jdkStream() {
        return this.jdkStream.get();
    }

    public ReactiveSeq<T> reactiveSeq() {
        return this.reactiveSeq.get();
    }

    public void onSubscribe(Subscription s) {
        Objects.requireNonNull(s);
        if (this.subscription != null) {
            this.subscription.cancel();
            s.cancel();
            return;
        }
        this.subscription = s;
        while (this.counter.subscription.size() > this.maxConcurrency) {
            LockSupport.parkNanos(100L);
        }
        this.counter.subscription.plus((Object)this.subscription);
        s.request(1L);
    }

    public void onNext(T t) {
        Objects.requireNonNull(t);
        this.queue.add(t);
        ++this.counter.added;
    }

    public void onError(Throwable t) {
        Objects.requireNonNull(t);
        if (this.stream != null) {
            this.stream.getErrorHandler().orElse(h -> {}).accept(t);
        }
        if (this.errorHandler != null) {
            this.errorHandler.accept(t);
        }
    }

    public void onComplete() {
        this.counter.active.decrementAndGet();
        this.counter.subscription.minus(this.subscription);
        boolean set = false;
        if (this.queue != null && this.counter.active.get() == 0L && this.counter.completable) {
            this.counter.closed = true;
            this.queue.addContinuation(new Continuation(() -> {
                ArrayList<T> current = new ArrayList<T>();
                while (this.queue.size() > 0) {
                    current.add(this.queue.get());
                }
                throw new Queue.ClosedQueueException(current);
            }));
            this.queue.close();
        }
    }

    public void close() {
        this.counter.completable = true;
        boolean set = false;
        if (this.queue != null && this.counter.active.get() == 0L) {
            this.counter.closed = true;
            this.queue.addContinuation(new Continuation(() -> {
                throw new Queue.ClosedQueueException();
            }));
            this.queue.close();
        }
    }

    public void addContinuation(Continuation c) {
        this.queue.addContinuation(c);
    }

    public Queue<T> getQueue() {
        return this.queue;
    }

    public Subscription getSubscription() {
        return this.subscription;
    }

    public void setErrorHandler(Consumer<Throwable> errorHandler) {
        this.errorHandler = errorHandler;
    }

    public static class Counter {
        public AtomicLong active = new AtomicLong(0L);
        volatile boolean completable = false;
        final QueueX<Subscription> subscription = QueueX.fromIterable(Collectors.toCollection(() -> new ConcurrentLinkedQueue()), Arrays.asList(new Subscription[0]));
        volatile boolean closed = false;
        volatile int added = 0;
    }
}

