/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.examples.earlyresults;

import com.hazelcast.collection.IList;
import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.datamodel.WindowResult;
import com.hazelcast.jet.examples.earlyresults.TradingVolumeGui;
import com.hazelcast.jet.examples.tradesource.Trade;
import com.hazelcast.jet.examples.tradesource.TradeSource;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.WindowDefinition;
import java.util.concurrent.TimeUnit;

public final class TradingVolumeOverTime {
    private static final String VOLUME_LIST_NAME = "trading-volume";
    private static final int TRADES_PER_SEC = 3000;
    private static final int MAX_LAG = 5000;
    private static final int DURATION_SECONDS = 55;

    private static Pipeline buildPipeline() {
        Pipeline p = Pipeline.create();
        p.readFrom(TradeSource.tradeStream((int)1, (int)3000, (int)5000)).withNativeTimestamps(5000L).window((WindowDefinition)WindowDefinition.tumbling((long)TimeUnit.SECONDS.toMillis(2L)).setEarlyResultsPeriod(20L)).aggregate(AggregateOperations.summingLong(Trade::getQuantity)).writeTo(Sinks.list((String)VOLUME_LIST_NAME));
        return p;
    }

    public static void main(String[] args) throws Exception {
        JetInstance jet = Jet.bootstrappedInstance();
        new TradingVolumeGui((IList<WindowResult<Long>>)jet.getList(VOLUME_LIST_NAME));
        try {
            Job job = jet.newJob(TradingVolumeOverTime.buildPipeline());
            TimeUnit.SECONDS.sleep(55L);
            job.cancel();
            job.join();
        }
        finally {
            Jet.shutdownAll();
        }
    }
}

