/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.examples.tradesource;

import com.hazelcast.function.BiConsumerEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.jet.accumulator.LongLongAccumulator;
import com.hazelcast.jet.examples.tradesource.Trade;
import com.hazelcast.jet.pipeline.SourceBuilder;
import com.hazelcast.jet.pipeline.StreamSource;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

public final class TradeSource {
    @Nonnull
    public static StreamSource<Trade> tradeStream(int tradesPerSec) {
        return TradeSource.tradeStream(Integer.MAX_VALUE, tradesPerSec, 0);
    }

    @Nonnull
    public static StreamSource<Trade> tradeStream(int numTickers, int tradesPerSec) {
        return TradeSource.tradeStream(numTickers, tradesPerSec, 0);
    }

    @Nonnull
    public static StreamSource<Trade> tradeStream(int numTickers, int tradesPerSec, int maxLag) {
        if (numTickers <= 0) {
            throw new IllegalArgumentException("Number of tickers has to be at least 1");
        }
        if (tradesPerSec <= 0) {
            throw new IllegalArgumentException("Number of trades per second has to be at least one");
        }
        return SourceBuilder.timestampedStream((String)"trade-source", (FunctionEx & Serializable)x -> new TradeGenerator(numTickers, tradesPerSec, maxLag)).fillBufferFn((BiConsumerEx & Serializable)(rec$, x$0) -> ((TradeGenerator)rec$).generateTrades((SourceBuilder.TimestampedSourceBuffer<Trade>)x$0)).build();
    }

    private static final class TradeGenerator {
        private static final int LOT = 100;
        private static final long MONEY_SCALE_FACTOR = 1000L;
        private final List<String> tickers;
        private final long emitPeriodNanos;
        private final long startTimeMillis;
        private final long startTimeNanos;
        private final long maxLagNanos;
        private final Map<String, LongLongAccumulator> pricesAndTrends;
        private long scheduledTimeNanos;

        private TradeGenerator(long numTickers, int tradesPerSec, int maxLagMillis) {
            this.tickers = TradeGenerator.loadTickers(numTickers);
            this.maxLagNanos = TimeUnit.MILLISECONDS.toNanos(maxLagMillis);
            this.pricesAndTrends = this.tickers.stream().collect(Collectors.toMap(t -> t, t -> new LongLongAccumulator(50000L, 100L)));
            this.emitPeriodNanos = TimeUnit.SECONDS.toNanos(1L) / (long)tradesPerSec;
            this.startTimeNanos = this.scheduledTimeNanos = System.nanoTime();
            this.startTimeMillis = System.currentTimeMillis();
        }

        private void generateTrades(SourceBuilder.TimestampedSourceBuffer<Trade> buf) {
            ThreadLocalRandom rnd = ThreadLocalRandom.current();
            long nowNanos = System.nanoTime();
            while (this.scheduledTimeNanos <= nowNanos) {
                String ticker = this.tickers.get(rnd.nextInt(this.tickers.size()));
                LongLongAccumulator priceAndDelta = this.pricesAndTrends.get(ticker);
                long price = TradeGenerator.getNextPrice(priceAndDelta, rnd) / 1000L;
                long tradeTimeNanos = this.scheduledTimeNanos - (this.maxLagNanos > 0L ? rnd.nextLong(this.maxLagNanos) : 0L);
                long tradeTimeMillis = this.startTimeMillis + TimeUnit.NANOSECONDS.toMillis(tradeTimeNanos - this.startTimeNanos);
                Trade trade = new Trade(tradeTimeMillis, ticker, rnd.nextInt(1, 10) * 100, price);
                buf.add((Object)trade, tradeTimeMillis);
                this.scheduledTimeNanos += this.emitPeriodNanos;
                if (this.scheduledTimeNanos <= nowNanos) continue;
                nowNanos = System.nanoTime();
            }
        }

        private static long getNextPrice(LongLongAccumulator priceAndDelta, ThreadLocalRandom rnd) {
            long delta;
            long price = priceAndDelta.get1();
            if (price + (delta = priceAndDelta.get2()) <= 0L) {
                delta = -delta;
            }
            price += delta;
            delta = delta + rnd.nextLong(1001L) - 500L;
            priceAndDelta.set1(price);
            priceAndDelta.set2(delta);
            return price;
        }

        /*
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        private static List<String> loadTickers(long numTickers) {
            try (BufferedReader reader = new BufferedReader(new InputStreamReader(TradeSource.class.getResourceAsStream("/nasdaqlisted.txt"), StandardCharsets.UTF_8));){
                List<String> list = reader.lines().skip(1L).limit(numTickers).map(l -> l.split("\\|")[0]).collect(Collectors.toList());
                return list;
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

