/*
 * Decompiled with CFR 0.152.
 */
package com.aol.cyclops.data.async;

import com.aol.cyclops.control.ReactiveSeq;
import com.aol.cyclops.data.async.Adapter;
import com.aol.cyclops.data.async.Queue;
import com.aol.cyclops.data.async.Signal;
import com.aol.cyclops.react.async.subscription.Continueable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jooq.lambda.Seq;
import org.pcollections.HashTreePMap;
import org.pcollections.PMap;
import org.pcollections.PVector;
import org.pcollections.TreePVector;

public class Topic<T>
implements Adapter<T> {
    private final DistributingCollection<T> distributor = new DistributingCollection();
    private volatile PMap<Seq, Queue<T>> streamToQueue = HashTreePMap.empty();
    private final Object lock = new Object();
    private volatile int index = 0;

    public Topic() {
        Queue q = new Queue();
        this.distributor.addQueue(q);
    }

    public Topic(Queue<T> q) {
        this.distributor.addQueue(q);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void disconnect(Stream<T> stream) {
        Object object = this.lock;
        synchronized (object) {
            this.distributor.removeQueue((Queue)this.streamToQueue.get(stream));
            this.streamToQueue = this.streamToQueue.minus(stream);
            --this.index;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <R> ReactiveSeq<R> connect(Function<Queue<T>, ReactiveSeq<R>> streamCreator) {
        Object object = this.lock;
        synchronized (object) {
            Queue<T> queue = this.getNextQueue();
            ReactiveSeq<R> stream = streamCreator.apply(queue);
            this.streamToQueue = this.streamToQueue.plus(stream, queue);
            return stream;
        }
    }

    @Override
    public boolean fromStream(Stream<T> stream) {
        stream.collect(Collectors.toCollection(() -> this.distributor));
        return true;
    }

    @Override
    public ReactiveSeq<CompletableFuture<T>> streamCompletableFutures() {
        return this.connect(q -> q.streamCompletableFutures());
    }

    @Override
    public ReactiveSeq<T> stream() {
        return this.connect(q -> q.stream());
    }

    @Override
    public ReactiveSeq<T> stream(Continueable s) {
        return this.connect(q -> q.stream(s));
    }

    private Queue<T> getNextQueue() {
        if (this.index >= this.distributor.getSubscribers().size()) {
            this.distributor.addQueue(new Queue());
        }
        return (Queue)this.distributor.getSubscribers().get(this.index++);
    }

    @Override
    public boolean close() {
        this.distributor.getSubscribers().forEach(it -> it.close());
        return true;
    }

    public Signal<Integer> getSizeSignal(int index) {
        return ((Queue)this.distributor.getSubscribers().get(index)).getSizeSignal();
    }

    public void setSizeSignal(int index, Signal<Integer> s) {
        ((Queue)this.distributor.getSubscribers().get(index)).setSizeSignal(s);
    }

    @Override
    public boolean offer(T data) {
        this.fromStream(Stream.of(data));
        return true;
    }

    @Override
    public <R> R visit(Function<? super Queue<T>, ? extends R> caseQueue, Function<? super Topic<T>, ? extends R> caseTopic) {
        return caseTopic.apply(this);
    }

    DistributingCollection<T> getDistributor() {
        return this.distributor;
    }

    PMap<Seq, Queue<T>> getStreamToQueue() {
        return this.streamToQueue;
    }

    static class DistributingCollection<T>
    extends ArrayList<T> {
        private static final long serialVersionUID = 1L;
        private volatile PVector<Queue<T>> subscribers = TreePVector.empty();
        private final Object lock = new Object();

        DistributingCollection() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void addQueue(Queue<T> q) {
            Object object = this.lock;
            synchronized (object) {
                this.subscribers = this.subscribers.plus(q);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void removeQueue(Queue<T> q) {
            Object object = this.lock;
            synchronized (object) {
                this.subscribers = this.subscribers.minus(q);
            }
        }

        @Override
        public boolean add(T e) {
            this.subscribers.forEach(it -> it.offer(e));
            return true;
        }

        @Override
        public boolean addAll(Collection<? extends T> c) {
            this.subscribers.forEach(it -> c.forEach((? super T next) -> it.offer(next)));
            return true;
        }

        public PVector<Queue<T>> getSubscribers() {
            return this.subscribers;
        }
    }
}

