/*
 * Decompiled with CFR 0.152.
 */
package org.apache.edgent.topology.plumbing;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.edgent.function.BiFunction;
import org.apache.edgent.function.Consumer;
import org.apache.edgent.function.Function;
import org.apache.edgent.function.ToIntFunction;
import org.apache.edgent.oplet.plumbing.Barrier;
import org.apache.edgent.oplet.plumbing.Isolate;
import org.apache.edgent.oplet.plumbing.PressureReliever;
import org.apache.edgent.oplet.plumbing.UnorderedIsolate;
import org.apache.edgent.topology.TStream;
import org.apache.edgent.topology.plumbing.LoadBalancedSplitter;

public class PlumbingStreams {
    public static <T> TStream<T> blockingDelay(TStream<T> stream, long delay, TimeUnit unit) {
        return stream.map((Function & Serializable)t -> {
            try {
                Thread.sleep(unit.toMillis(delay));
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
            return t;
        });
    }

    public static <T> TStream<T> blockingThrottle(TStream<T> stream, long delay, TimeUnit unit) {
        return stream.map(PlumbingStreams.blockingThrottle(delay, unit));
    }

    private static <T> Function<T, T> blockingThrottle(long delay, TimeUnit unit) {
        long[] nextTupleTime = new long[]{0L};
        return (Function & Serializable)t -> {
            long now = System.currentTimeMillis();
            if (nextTupleTime[0] != 0L && now < nextTupleTime[0]) {
                try {
                    Thread.sleep(nextTupleTime[0] - now);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
                now = System.currentTimeMillis();
            }
            nextTupleTime[0] = now + unit.toMillis(delay);
            return t;
        };
    }

    public static <T> TStream<T> blockingOneShotDelay(TStream<T> stream, long delay, TimeUnit unit) {
        return stream.map(PlumbingStreams.blockingOneShotDelay(delay, unit));
    }

    private static <T> Function<T, T> blockingOneShotDelay(long delay, TimeUnit unit) {
        long[] initialDelay = new long[]{unit.toMillis(delay)};
        return (Function & Serializable)t -> {
            if (initialDelay[0] != -1L) {
                try {
                    Thread.sleep(initialDelay[0]);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
                initialDelay[0] = -1L;
            }
            return t;
        };
    }

    public static <T, K> TStream<T> pressureReliever(TStream<T> stream, Function<T, K> keyFunction, int count) {
        return stream.pipe(new PressureReliever(count, keyFunction));
    }

    public static <T> TStream<T> isolate(TStream<T> stream, boolean ordered) {
        return stream.pipe(ordered ? new Isolate() : new UnorderedIsolate());
    }

    public static <T> TStream<T> isolate(TStream<T> stream, int queueCapacity) {
        return stream.pipe(new Isolate(queueCapacity));
    }

    public static <T, U, R> TStream<R> concurrentMap(TStream<T> stream, List<Function<T, U>> mappers, Function<List<U>, R> combiner) {
        Objects.requireNonNull(stream, "stream");
        Objects.requireNonNull(mappers, "mappers");
        Objects.requireNonNull(combiner, "combiner");
        ArrayList<Function<TStream<T>, TStream<U>>> pipelines = new ArrayList<Function<TStream<T>, TStream<U>>>();
        for (Function mapper : mappers) {
            pipelines.add((Function & Serializable)s -> s.map(mapper));
        }
        return PlumbingStreams.concurrent(stream, pipelines, combiner);
    }

    public static <T, U, R> TStream<R> concurrent(TStream<T> stream, List<Function<TStream<T>, TStream<U>>> pipelines, Function<List<U>, R> combiner) {
        Objects.requireNonNull(stream, "stream");
        Objects.requireNonNull(pipelines, "pipelines");
        Objects.requireNonNull(combiner, "combiner");
        int barrierQueueCapacity = 10;
        ArrayList<TStream<T>> fanouts = new ArrayList<TStream<T>>(pipelines.size());
        for (int i = 0; i < pipelines.size(); ++i) {
            fanouts.add(PlumbingStreams.isolate(stream, 1).tag("concurrent.isolated-ch" + i));
        }
        ArrayList<TStream<T>> results = new ArrayList<TStream<T>>(pipelines.size());
        int ch = 0;
        for (Function<TStream<T>, TStream<U>> pipeline : pipelines) {
            results.add(((TStream)pipeline.apply(fanouts.get(ch))).tag("concurrent-ch" + ch));
            ++ch;
        }
        TStream<List<List<U>>> barrier = PlumbingStreams.barrier(results, barrierQueueCapacity).tag("concurrent.barrier");
        return barrier.map(combiner);
    }

    public static <T> TStream<List<T>> barrier(List<TStream<T>> streams) {
        return PlumbingStreams.barrier(streams, 1);
    }

    public static <T> TStream<List<T>> barrier(List<TStream<T>> streams, int queueCapacity) {
        ArrayList<TStream<T>> others = new ArrayList<TStream<T>>(streams);
        TStream s1 = (TStream)others.remove(0);
        return s1.fanin(new Barrier(queueCapacity), others);
    }

    public static <T, U> TStream<U> parallelMap(TStream<T> stream, int width, ToIntFunction<T> splitter, BiFunction<T, Integer, U> mapper) {
        BiFunction & Serializable pipeline = (BiFunction & Serializable)(s, ch) -> s.map((Function & Serializable)t -> mapper.apply(t, ch));
        return PlumbingStreams.parallel(stream, width, splitter, pipeline);
    }

    public static <T, R> TStream<R> parallel(TStream<T> stream, int width, ToIntFunction<T> splitter, BiFunction<TStream<T>, Integer, TStream<R>> pipeline) {
        Objects.requireNonNull(stream, "stream");
        if (width < 1) {
            throw new IllegalArgumentException("width");
        }
        Objects.requireNonNull(splitter, "splitter");
        Objects.requireNonNull(pipeline, "pipeline");
        List<TStream<T>> channels = stream.split(width, splitter);
        for (int ch = 0; ch < width; ++ch) {
            channels.set(ch, channels.get(ch).tag("parallel.split-ch" + ch));
        }
        int chBufferSize = 10;
        for (int ch = 0; ch < width; ++ch) {
            channels.set(ch, PlumbingStreams.isolate(channels.get(ch), chBufferSize).tag("parallel.isolated-ch" + ch));
        }
        ArrayList results = new ArrayList(width);
        for (int ch = 0; ch < width; ++ch) {
            results.add(((TStream)pipeline.apply(channels.get(ch), (Object)ch)).tag("parallel-ch" + ch));
        }
        TStream result = ((TStream)results.get(0)).union(new HashSet(results)).tag("parallel.union");
        return PlumbingStreams.isolate(result, width);
    }

    public static <T, R> TStream<R> parallelBalanced(TStream<T> stream, int width, BiFunction<TStream<T>, Integer, TStream<R>> pipeline) {
        Objects.requireNonNull(stream, "stream");
        if (width < 1) {
            throw new IllegalArgumentException("width");
        }
        Objects.requireNonNull(pipeline, "pipeline");
        LoadBalancedSplitter splitter = new LoadBalancedSplitter(width);
        List<TStream<T>> channels = stream.split(width, splitter);
        for (int ch = 0; ch < width; ++ch) {
            channels.set(ch, channels.get(ch).tag("parallel.split-ch" + ch));
        }
        int chBufferSize = 1;
        for (int ch = 0; ch < width; ++ch) {
            channels.set(ch, PlumbingStreams.isolate(channels.get(ch), chBufferSize).tag("parallel.isolated-ch" + ch));
        }
        ArrayList results = new ArrayList(width);
        for (int ch = 0; ch < width; ++ch) {
            int finalCh = ch;
            results.add(((TStream)pipeline.apply(channels.get(ch), (Object)ch)).tag("parallel-ch" + ch).peek((Consumer & Serializable)tuple -> splitter.channelDone(finalCh)));
        }
        TStream result = ((TStream)results.get(0)).union(new HashSet(results)).tag("parallel.union");
        return PlumbingStreams.isolate(result, width);
    }

    public static <T> ToIntFunction<T> roundRobinSplitter(int width) {
        AtomicInteger cnt = new AtomicInteger();
        return (ToIntFunction & Serializable)tuple -> cnt.getAndIncrement() % width;
    }

    public static <T> TStream<T> gate(TStream<T> stream, Semaphore semaphore) {
        return stream.map((Function & Serializable)tuple -> {
            try {
                semaphore.acquire();
                return tuple;
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("interrupted", e);
            }
        });
    }
}

