/*
 * Decompiled with CFR 0.152.
 */
package com.aol.cyclops.internal.stream;

import com.aol.cyclops.util.ExceptionSoftener;
import com.aol.cyclops.util.stream.scheduling.cron.CronExpression;
import java.util.Date;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;

public class IteratorHotStream<T> {
    protected final AtomicReferenceArray<Queue<T>> connections = new AtomicReferenceArray(10);
    protected final AtomicBoolean open = new AtomicBoolean(true);
    protected volatile int connected = 0;
    protected final AtomicReference<CompletableFuture<Void>> pause = new AtomicReference<CompletableFuture<Object>>(CompletableFuture.completedFuture(null));

    public boolean isPaused() {
        return this.pause.get().isDone();
    }

    protected void unpause() {
        CompletableFuture<Void> current = this.pause.get();
        if (!current.isDone()) {
            current.complete(null);
        }
    }

    protected void pause() {
        this.pause.set(new CompletableFuture());
    }

    protected void scheduleInternal(Iterator<T> it, String cron, ScheduledExecutorService ex) {
        Date now = new Date();
        Date d = ((CronExpression)ExceptionSoftener.softenSupplier(() -> new CronExpression(cron)).get()).getNextValidTimeAfter(now);
        long delay = d.getTime() - now.getTime();
        ex.schedule(() -> {
            Iterator iterator = it;
            synchronized (iterator) {
                if (it.hasNext()) {
                    try {
                        Object next = it.next();
                        int local = this.connected;
                        for (int i = 0; i < local; ++i) {
                            this.connections.get(i).offer(next);
                        }
                    }
                    finally {
                        this.scheduleInternal(it, cron, ex);
                    }
                } else {
                    this.open.set(false);
                }
            }
        }, delay, TimeUnit.MILLISECONDS);
    }

    protected IteratorHotStream<T> scheduleFixedDelayInternal(Iterator<T> it, long delay, ScheduledExecutorService ex) {
        ex.scheduleWithFixedDelay(() -> {
            Iterator iterator = it;
            synchronized (iterator) {
                if (it.hasNext()) {
                    Object next = it.next();
                    int local = this.connected;
                    for (int i = 0; i < local; ++i) {
                        this.connections.get(i).offer(next);
                    }
                } else {
                    this.open.set(false);
                }
            }
        }, delay, delay, TimeUnit.MILLISECONDS);
        return this;
    }

    protected IteratorHotStream<T> scheduleFixedRate(Iterator<T> it, long rate, ScheduledExecutorService ex) {
        ex.scheduleAtFixedRate(() -> {
            Iterator iterator = it;
            synchronized (iterator) {
                if (it.hasNext()) {
                    Object next = it.next();
                    int local = this.connected;
                    for (int i = 0; i < local; ++i) {
                        this.connections.get(i).offer(next);
                    }
                } else {
                    this.open.set(false);
                }
            }
        }, 0L, rate, TimeUnit.MILLISECONDS);
        return this;
    }
}

