/*
 * Decompiled with CFR 0.152.
 */
package com.aol.cyclops.javaslang.streams;

import com.aol.cyclops.invokedynamic.ExceptionSoftener;
import com.aol.cyclops.javaslang.FromJDK;
import com.aol.cyclops.javaslang.streams.JavaslangHotStream;
import com.aol.cyclops.scheduling.util.cron.CronExpression;
import com.aol.cyclops.sequence.SequenceM;
import com.aol.cyclops.streams.spliterators.ClosingSpliterator;
import java.util.Date;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.Function;
import java.util.stream.StreamSupport;
import javaslang.collection.Stream;
import uk.co.real_logic.agrona.concurrent.OneToOneConcurrentArrayQueue;

public class HotStreamImpl<T>
implements JavaslangHotStream<T> {
    private final Stream<T> stream;
    private final AtomicReferenceArray<Queue<T>> connections = new AtomicReferenceArray(10);
    private final AtomicBoolean open = new AtomicBoolean(true);
    private volatile int connected = 0;

    public HotStreamImpl(Stream<T> stream) {
        this.stream = stream;
    }

    public JavaslangHotStream<T> init(Executor exec) {
        CompletableFuture.runAsync(() -> {
            this.stream.forEach(a -> {
                int local = this.connected;
                for (int i = 0; i < local; ++i) {
                    this.connections.get(i).offer(a);
                }
            });
            this.open.set(false);
        }, exec);
        return this;
    }

    public JavaslangHotStream<T> schedule(String cron, ScheduledExecutorService ex) {
        javaslang.collection.Iterator it = this.stream.iterator();
        return this.scheduleInternal((Iterator<T>)it, cron, ex);
    }

    private JavaslangHotStream<T> scheduleInternal(Iterator<T> it, String cron, ScheduledExecutorService ex) {
        Date now = new Date();
        Date d = ((CronExpression)ExceptionSoftener.softenSupplier(() -> new CronExpression(cron)).get()).getNextInvalidTimeAfter(now);
        long delay = d.getTime() - now.getTime();
        ex.schedule(() -> {
            Iterator iterator2 = it;
            synchronized (iterator2) {
                if (it.hasNext()) {
                    try {
                        Object next = it.next();
                        int local = this.connected;
                        for (int i = 0; i < local; ++i) {
                            this.connections.get(i).offer(next);
                        }
                    }
                    finally {
                        this.scheduleInternal(it, cron, ex);
                    }
                } else {
                    this.open.set(false);
                }
            }
        }, delay, TimeUnit.MILLISECONDS);
        return this;
    }

    public JavaslangHotStream<T> scheduleFixedDelay(long delay, ScheduledExecutorService ex) {
        javaslang.collection.Iterator it = this.stream.iterator();
        ex.scheduleWithFixedDelay(() -> this.lambda$scheduleFixedDelay$99((Iterator)it), delay, delay, TimeUnit.MILLISECONDS);
        return this;
    }

    public JavaslangHotStream<T> scheduleFixedRate(long rate, ScheduledExecutorService ex) {
        javaslang.collection.Iterator it = this.stream.iterator();
        ex.scheduleAtFixedRate(() -> this.lambda$scheduleFixedRate$100((Iterator)it), 0L, rate, TimeUnit.MILLISECONDS);
        return this;
    }

    @Override
    public Stream<T> connect() {
        return this.connect((Queue<T>)new OneToOneConcurrentArrayQueue(256));
    }

    @Override
    public Stream<T> connect(Queue<T> queue) {
        this.connections.getAndSet(this.connected, queue);
        ++this.connected;
        return FromJDK.stream(SequenceM.fromStream(StreamSupport.stream(new ClosingSpliterator(Long.MAX_VALUE, queue, this.open), false)));
    }

    @Override
    public <R extends Stream<T>> R connectTo(Queue<T> queue, Function<Stream<T>, R> to) {
        return (R)((Stream)to.apply(this.connect(queue)));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private /* synthetic */ void lambda$scheduleFixedRate$100(Iterator iterator) {
        Iterator iterator2 = iterator;
        synchronized (iterator2) {
            if (iterator.hasNext()) {
                Object next = iterator.next();
                int local = this.connected;
                for (int i = 0; i < local; ++i) {
                    this.connections.get(i).offer(next);
                }
            } else {
                this.open.set(false);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private /* synthetic */ void lambda$scheduleFixedDelay$99(Iterator iterator) {
        Iterator iterator2 = iterator;
        synchronized (iterator2) {
            if (iterator.hasNext()) {
                Object next = iterator.next();
                int local = this.connected;
                for (int i = 0; i < local; ++i) {
                    this.connections.get(i).offer(next);
                }
            } else {
                this.open.set(false);
            }
        }
    }
}

