/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.pipeline.test;

import com.hazelcast.function.BiConsumerEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.jet.annotation.EvolvingApi;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.jet.pipeline.BatchSource;
import com.hazelcast.jet.pipeline.SourceBuilder;
import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.test.GeneratorFunction;
import com.hazelcast.jet.pipeline.test.SimpleEvent;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;

@EvolvingApi
public final class TestSources {
    private TestSources() {
    }

    @Nonnull
    public static <T> BatchSource<T> items(@Nonnull Iterable<? extends T> items) {
        Objects.requireNonNull(items, "items");
        return ((SourceBuilder.Batch)SourceBuilder.batch("items", (FunctionEx & Serializable)ctx -> null).fillBufferFn((BiConsumerEx & Serializable)(ignored, buf) -> {
            items.forEach(buf::add);
            buf.close();
        })).build();
    }

    @Nonnull
    public static <T> BatchSource<T> items(T ... items) {
        Objects.requireNonNull(items, "items");
        return TestSources.items(Arrays.asList(items));
    }

    @EvolvingApi
    @Nonnull
    public static StreamSource<SimpleEvent> itemStream(int itemsPerSecond) {
        return TestSources.itemStream(itemsPerSecond, SimpleEvent::new);
    }

    @EvolvingApi
    @Nonnull
    public static <T> StreamSource<T> itemStream(int itemsPerSecond, @Nonnull GeneratorFunction<? extends T> generatorFn) {
        Objects.requireNonNull(generatorFn, "generatorFn");
        Util.checkSerializable(generatorFn, "generatorFn");
        return SourceBuilder.timestampedStream("itemStream", (FunctionEx & Serializable)ctx -> new ItemStreamSource(itemsPerSecond, generatorFn)).fillBufferFn(ItemStreamSource::fillBuffer).build();
    }

    private static final class ItemStreamSource<T> {
        private static final int MAX_BATCH_SIZE = 1024;
        private final GeneratorFunction<? extends T> generator;
        private final long periodNanos;
        private long emitSchedule;
        private long sequence;

        private ItemStreamSource(int itemsPerSecond, GeneratorFunction<? extends T> generator) {
            this.periodNanos = TimeUnit.SECONDS.toNanos(1L) / (long)itemsPerSecond;
            this.generator = generator;
        }

        void fillBuffer(SourceBuilder.TimestampedSourceBuffer<T> buf) throws Exception {
            long nowNs = System.nanoTime();
            if (this.emitSchedule == 0L) {
                this.emitSchedule = nowNs;
            }
            long tsNanos = TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis());
            long ts = TimeUnit.NANOSECONDS.toMillis(tsNanos - tsNanos % this.periodNanos);
            for (int i = 0; i < 1024 && nowNs >= this.emitSchedule; ++i) {
                T item = this.generator.generate(ts, this.sequence++);
                buf.add(item, ts);
                this.emitSchedule += this.periodNanos;
            }
        }
    }
}

