/*
 * Decompiled with CFR 0.152.
 */
package com.aol.cyclops.control;

import com.aol.cyclops.control.Eval;
import com.aol.cyclops.control.FutureW;
import com.aol.cyclops.control.LazyReact;
import com.aol.cyclops.control.Maybe;
import com.aol.cyclops.control.ReactiveSeq;
import com.aol.cyclops.control.Try;
import com.aol.cyclops.control.Xor;
import com.aol.cyclops.data.LazyImmutable;
import com.aol.cyclops.data.async.Adapter;
import com.aol.cyclops.data.collections.extensions.persistent.PMapX;
import com.aol.cyclops.data.collections.extensions.standard.ListX;
import com.aol.cyclops.react.threads.SequentialElasticPools;
import com.aol.cyclops.types.futurestream.LazyFutureStream;
import com.aol.cyclops.types.stream.reactive.SeqSubscriber;
import com.aol.cyclops.types.stream.reactive.ValueSubscriber;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

public class Pipes<K, V> {
    private final ConcurrentMap<K, Adapter<V>> registered = new ConcurrentHashMap<K, Adapter<V>>();

    public int size() {
        return this.registered.size();
    }

    public PMapX<K, Adapter<V>> registered() {
        return PMapX.fromMap(this.registered);
    }

    public static <K, V> Pipes<K, V> of() {
        return new Pipes<K, V>();
    }

    public static <K, V> Pipes<K, V> of(Map<K, Adapter<V>> registered) {
        Objects.requireNonNull(registered);
        Pipes<K, V> pipes = new Pipes<K, V>();
        pipes.registered.putAll(registered);
        return pipes;
    }

    public void push(K key, V value) {
        Optional.ofNullable(this.registered.get(key)).ifPresent(a -> a.offer(value));
    }

    public Maybe<Adapter<V>> get(K key) {
        return Maybe.ofNullable((Adapter)this.registered.get(key));
    }

    public Maybe<LazyFutureStream<V>> futureStream(K key) {
        return this.get(key).map(a -> a.futureStream());
    }

    public Maybe<LazyFutureStream<V>> futureStream(K key, LazyReact reactor) {
        return this.get(key).map(a -> a.futureStream(reactor));
    }

    public Maybe<ReactiveSeq<V>> reactiveSeq(K key) {
        return this.get(key).map(a -> a.stream());
    }

    public ListX<V> xValues(K key, long x) {
        SeqSubscriber sub = SeqSubscriber.subscriber();
        return this.get(key).peek(a -> a.stream().subscribe(sub)).map(a -> sub.stream().limit(x).toListX()).orElse(ListX.empty());
    }

    public Maybe<V> oneValue(K key) {
        ValueSubscriber sub = ValueSubscriber.subscriber();
        return this.get(key).peek(a -> a.stream().subscribe(sub)).flatMap(a -> sub.toMaybe());
    }

    public Xor<Throwable, V> oneOrError(K key) {
        ValueSubscriber sub = ValueSubscriber.subscriber();
        return this.get(key).peek(a -> a.stream().subscribe(sub)).map(a -> sub.toXor()).orElse(Xor.secondary(new NoSuchElementException("no adapter for key " + key)));
    }

    public <X extends Throwable> Maybe<Try<V, X>> oneValueOrError(K key, Class<X> ... classes) {
        ValueSubscriber sub = ValueSubscriber.subscriber();
        return this.get(key).peek(a -> a.stream().subscribe(sub)).map(a -> sub.toTry(classes));
    }

    public Maybe<Try<V, Throwable>> oneValueOrError(K key) {
        ValueSubscriber sub = ValueSubscriber.subscriber();
        return this.get(key).peek(a -> a.stream().subscribe(sub)).map(a -> sub.toTry(Throwable.class));
    }

    public FutureW<V> oneOrErrorAsync(K key, Executor ex) {
        CompletableFuture<Object> cf = CompletableFuture.supplyAsync(() -> {
            ValueSubscriber sub = ValueSubscriber.subscriber();
            return this.get(key).peek(a -> a.stream().subscribe(sub)).map(a -> sub.toMaybe().get()).get();
        }, ex);
        return FutureW.of(cf);
    }

    public Eval<Maybe<V>> nextValue(K key) {
        ValueSubscriber sub = ValueSubscriber.subscriber();
        LazyImmutable requested = LazyImmutable.def();
        return this.get(key).peek(a -> a.stream().subscribe(sub)).map(a -> Eval.always(() -> {
            if (requested.isSet()) {
                sub.requestOne();
            } else {
                requested.setOnce(true);
            }
            Maybe res = sub.toMaybe();
            return res;
        })).orElse(Eval.now(Maybe.none()));
    }

    public Eval<V> nextOrNull(K key) {
        ValueSubscriber sub = ValueSubscriber.subscriber();
        LazyImmutable requested = LazyImmutable.def();
        return this.get(key).peek(a -> a.stream().subscribe(sub)).map(a -> Eval.always(() -> {
            if (requested.isSet()) {
                sub.requestOne();
            } else {
                requested.setOnce(true);
            }
            Maybe res = sub.toMaybe();
            return res.orElse(null);
        })).orElse(Eval.now(null));
    }

    public void register(K key, Adapter<V> adapter) {
        this.registered.put(key, adapter);
    }

    public void clear() {
        this.registered.clear();
    }

    public void subscribeTo(K key, Subscriber<V> subscriber) {
        ((Adapter)this.registered.get(key)).stream().subscribe(subscriber);
    }

    public void subscribeTo(K key, Subscriber<V> subscriber, Executor subscribeOn) {
        CompletableFuture.runAsync(() -> this.subscribeTo(key, subscriber), subscribeOn);
    }

    public void publishTo(K key, Publisher<V> publisher) {
        SeqSubscriber sub = SeqSubscriber.subscriber();
        publisher.subscribe(sub);
        ((Adapter)this.registered.get(key)).fromStream(sub.stream());
    }

    public void publishToAsync(K key, Publisher<V> publisher) {
        SequentialElasticPools.simpleReact.react(er -> er.of(publisher).peek(p -> this.publishTo(key, (Publisher<V>)p)));
    }

    public void close(String key) {
        Optional.ofNullable(this.registered.get(key)).ifPresent(a -> a.close());
    }

    private Pipes() {
    }
}

