/*
 * Decompiled with CFR 0.152.
 */
package com.aol.cyclops.internal.stream;

import com.aol.cyclops.control.ReactiveSeq;
import com.aol.cyclops.internal.stream.IteratorHotStream;
import com.aol.cyclops.internal.stream.spliterators.ClosingSpliterator;
import com.aol.cyclops.types.stream.HotStream;
import com.aol.cyclops.util.stream.StreamUtils;
import java.util.Iterator;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.agrona.concurrent.OneToOneConcurrentArrayQueue;

public abstract class BaseHotStreamImpl<T>
extends IteratorHotStream<T>
implements HotStream<T> {
    protected final Stream<T> stream;

    public BaseHotStreamImpl(Stream<T> stream) {
        this.stream = stream;
    }

    public HotStream<T> paused(Executor exec) {
        this.pause();
        return this.init(exec);
    }

    public abstract HotStream<T> init(Executor var1);

    public HotStream<T> schedule(String cron, ScheduledExecutorService ex) {
        Iterator it = this.stream.iterator();
        this.scheduleInternal(it, cron, ex);
        return this;
    }

    public HotStream<T> scheduleFixedDelay(long delay, ScheduledExecutorService ex) {
        Iterator it = this.stream.iterator();
        this.scheduleFixedDelayInternal(it, delay, ex);
        return this;
    }

    public HotStream<T> scheduleFixedRate(long rate, ScheduledExecutorService ex) {
        Iterator it = this.stream.iterator();
        this.scheduleFixedRate(it, rate, ex);
        return this;
    }

    @Override
    public ReactiveSeq<T> connect() {
        return this.connect((Queue<T>)new OneToOneConcurrentArrayQueue(256));
    }

    @Override
    public ReactiveSeq<T> connect(Queue<T> queue) {
        this.connections.getAndSet(this.connected, queue);
        ++this.connected;
        this.unpause();
        return StreamUtils.reactiveSeq(StreamSupport.stream(new ClosingSpliterator(Long.MAX_VALUE, queue, this.open), false), Optional.empty());
    }

    @Override
    public <R extends Stream<T>> R connectTo(Queue<T> queue, Function<ReactiveSeq<T>, R> to) {
        return (R)((Stream)to.apply(this.connect(queue)));
    }
}

