/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.examples.patternmatching.support;

import com.hazelcast.function.FunctionEx;
import com.hazelcast.jet.examples.patternmatching.support.TransactionEvent;
import com.hazelcast.jet.pipeline.SourceBuilder;
import com.hazelcast.jet.pipeline.StreamSource;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public final class TransactionGenerator {
    private final long emitPeriodNanos;
    private final long startTimeNanos;
    private long scheduledTimeNanos;
    private final Set<Long> transactionsInProgress = new HashSet<Long>();
    private long nextTransactionId;

    private TransactionGenerator(int tradesPerSec) {
        this.emitPeriodNanos = TimeUnit.SECONDS.toNanos(1L) / (long)tradesPerSec;
        this.startTimeNanos = this.scheduledTimeNanos = System.nanoTime();
    }

    public static StreamSource<TransactionEvent> transactionEventSource(int txPerSec) {
        return SourceBuilder.stream((String)"trade-source", (FunctionEx & Serializable)x -> new TransactionGenerator(txPerSec)).fillBufferFn(TransactionGenerator::generateTrades).build();
    }

    private void generateTrades(SourceBuilder.SourceBuffer<TransactionEvent> buf) {
        TransactionEvent.Type[] eventTypes = TransactionEvent.Type.values();
        ThreadLocalRandom rnd = ThreadLocalRandom.current();
        long nowNanos = System.nanoTime();
        while (this.scheduledTimeNanos <= nowNanos) {
            long timeMillis = TimeUnit.NANOSECONDS.toMillis(this.scheduledTimeNanos - this.startTimeNanos);
            TransactionEvent.Type eventType = eventTypes[rnd.nextInt(eventTypes.length)];
            switch (eventType) {
                case START: {
                    long transactionId;
                    ++this.nextTransactionId;
                    this.transactionsInProgress.add(transactionId);
                    buf.add((Object)new TransactionEvent(timeMillis, transactionId, eventType));
                    break;
                }
                case END: {
                    Iterator<Long> it = this.transactionsInProgress.iterator();
                    if (!it.hasNext()) break;
                    long transactionId = it.next();
                    it.remove();
                    buf.add((Object)new TransactionEvent(timeMillis, transactionId, eventType));
                    break;
                }
            }
            this.scheduledTimeNanos += this.emitPeriodNanos;
            if (this.scheduledTimeNanos <= nowNanos) continue;
            nowNanos = System.nanoTime();
        }
    }
}

