/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.examples.earlyresults.support;

import com.hazelcast.function.FunctionEx;
import com.hazelcast.jet.examples.earlyresults.support.Trade;
import com.hazelcast.jet.pipeline.SourceBuilder;
import com.hazelcast.jet.pipeline.StreamSource;
import java.io.Serializable;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public final class TradeGenerator {
    public static final long MAX_LAG = 5000L;
    private static final String TICKER = "TSLA";
    private final long emitPeriodNanos;
    private final long startTimeNanos;
    private final long endTimeNanos;
    private long scheduledTimeNanos;

    private TradeGenerator(int tradesPerSec, long timeoutSeconds) {
        this.emitPeriodNanos = TimeUnit.SECONDS.toNanos(1L) / (long)tradesPerSec;
        this.startTimeNanos = this.scheduledTimeNanos = System.nanoTime();
        this.endTimeNanos = this.startTimeNanos + TimeUnit.SECONDS.toNanos(timeoutSeconds);
    }

    public static StreamSource<Trade> tradeSource(int tradesPerSec, long timeoutSeconds) {
        return SourceBuilder.timestampedStream((String)"trade-source", (FunctionEx & Serializable)x -> new TradeGenerator(tradesPerSec, timeoutSeconds)).fillBufferFn(TradeGenerator::generateTrades).build();
    }

    private void generateTrades(SourceBuilder.TimestampedSourceBuffer<Trade> buf) {
        if (this.scheduledTimeNanos >= this.endTimeNanos) {
            return;
        }
        ThreadLocalRandom rnd = ThreadLocalRandom.current();
        long nowNanos = System.nanoTime();
        while (this.scheduledTimeNanos <= nowNanos) {
            long tradeTimeMillis = TimeUnit.NANOSECONDS.toMillis(this.scheduledTimeNanos - this.startTimeNanos) - rnd.nextLong(5000L);
            if (tradeTimeMillis >= 0L) {
                Trade trade = new Trade(tradeTimeMillis, TICKER, rnd.nextInt(30), 28000 + rnd.nextInt(5000));
                buf.add((Object)trade, tradeTimeMillis);
            }
            this.scheduledTimeNanos += this.emitPeriodNanos;
            if (this.scheduledTimeNanos <= nowNanos) continue;
            nowNanos = System.nanoTime();
        }
    }
}

