/*
 * Decompiled with CFR 0.152.
 */
package com.aol.cyclops.javaslang.streams;

import com.aol.cyclops.javaslang.streams.PausableJavaslangHotStream;
import com.aol.cyclops.javaslang.streams.StreamUtils;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.LockSupport;
import javaslang.collection.LazyStream;
import javaslang.collection.Stream;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

public class HotStreamTest {
    static final Executor exec = Executors.newFixedThreadPool(1);
    volatile Object value;

    @Test
    public void hotStream() throws InterruptedException {
        this.value = null;
        CountDownLatch latch = new CountDownLatch(1);
        StreamUtils.hotStream((Stream)Stream.of((Object[])new Integer[]{1, 2, 3}).peek(v -> {
            this.value = v;
        }).peek(v -> latch.countDown()), (Executor)exec);
        latch.await();
        Assert.assertTrue((this.value != null ? 1 : 0) != 0);
    }

    @Test
    public void hotStreamConnect() throws InterruptedException {
        for (int i = 0; i < 1000; ++i) {
            System.out.println(i);
            this.value = null;
            CountDownLatch latch = new CountDownLatch(1);
            StreamUtils.futureOperations((Stream)StreamUtils.hotStream((Stream)LazyStream.range((int)0, (int)Integer.MAX_VALUE).take(100L).peek(v -> {
                this.value = v;
            }).peek(v -> latch.countDown()).peek(System.out::println), (Executor)exec).connect().take(100L), (Executor)ForkJoinPool.commonPool()).forEach(System.out::println);
            latch.await();
            Assert.assertTrue((this.value != null ? 1 : 0) != 0);
        }
    }

    @Test
    public void hotStreamConnectBlockingQueue() throws InterruptedException {
        this.value = null;
        CountDownLatch latch = new CountDownLatch(1);
        StreamUtils.futureOperations((Stream)StreamUtils.hotStream((Stream)LazyStream.range((int)0, (int)Integer.MAX_VALUE).take(1000L).peek(v -> {
            this.value = v;
        }).peek(v -> latch.countDown()), (Executor)exec).connect(new LinkedBlockingQueue()).take(100L), (Executor)ForkJoinPool.commonPool()).forEach(System.out::println);
        latch.await();
        Assert.assertTrue((this.value != null ? 1 : 0) != 0);
    }

    @Test
    @Ignore
    public void hotStreamConnectPausable() throws InterruptedException {
        this.value = null;
        CountDownLatch latch = new CountDownLatch(1);
        PausableJavaslangHotStream s = StreamUtils.pausableHotStream((Stream)Stream.range((int)0, (int)Integer.MAX_VALUE).take(1000L).peek(v -> {
            this.value = v;
        }).peek(v -> latch.countDown()), (Executor)exec);
        StreamUtils.futureOperations((Stream)s.connect(new LinkedBlockingQueue()).take(100L), (Executor)ForkJoinPool.commonPool()).forEach(System.out::println);
        s.pause();
        Object oldValue = this.value;
        s.unpause();
        LockSupport.parkNanos(2000L);
        s.pause();
        System.out.println(this.value);
        Assert.assertTrue((this.value != oldValue ? 1 : 0) != 0);
        s.unpause();
        latch.await();
        Assert.assertTrue((this.value != null ? 1 : 0) != 0);
    }

    @Test
    public void hotStreamConnectPausableConnect() throws InterruptedException {
        this.value = null;
        CountDownLatch latch = new CountDownLatch(1);
        PausableJavaslangHotStream s = StreamUtils.pausableHotStream((Stream)Stream.range((int)0, (int)Integer.MAX_VALUE).take(10000L).peek(v -> {
            this.value = v;
        }).peek(v -> latch.countDown()), (Executor)exec);
        StreamUtils.futureOperations((Stream)s.connect().take(100L), (Executor)ForkJoinPool.commonPool()).forEach(System.out::println);
        Object oldValue = this.value;
        s.pause();
        s.unpause();
        LockSupport.parkNanos(1000L);
        s.pause();
        System.out.println(this.value);
        Assert.assertTrue((this.value != oldValue ? 1 : 0) != 0);
        s.unpause();
        latch.await();
        Assert.assertTrue((this.value != null ? 1 : 0) != 0);
    }
}

