/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.examples.faulttolerance;

import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.PredicateEx;
import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.jet.examples.faulttolerance.PriceUpdateEvent;
import com.hazelcast.jet.pipeline.JournalInitialPosition;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sources;
import com.hazelcast.jet.pipeline.WindowDefinition;
import com.hazelcast.map.IMap;
import java.io.Serializable;

public class FaultTolerance {
    private static final String PRICES_MAP_NAME = "prices";
    private static final String[] TICKER_LIST = new String[]{"AAPL", "AMZN", "EBAY", "GOOG", "MSFT", "TSLA"};
    private static final long LAG_SECONDS = 5L;
    private static final long WINDOW_SIZE_SECONDS = 10L;
    private static final long SHUTDOWN_DELAY_SECONDS = 20L;

    public static void main(String[] args) throws Exception {
        JobConfig config = new JobConfig();
        config.setProcessingGuarantee(ProcessingGuarantee.NONE);
        config.setSnapshotIntervalMillis(2000L);
        JetInstance instance1 = Jet.newJetInstance();
        JetInstance instance2 = Jet.newJetInstance();
        JetInstance client = Jet.newJetClient();
        Job job = client.newJob(FaultTolerance.buildPipeline(), config);
        Thread.sleep(1000L);
        System.out.println("******************************************");
        System.out.println("Starting price updater. You should start seeing the output after 5 seconds");
        System.out.println("After 20 seconds, one of the nodes will be shut down.");
        System.out.println("******************************************");
        new Thread(() -> FaultTolerance.updatePrices(instance1)).start();
        Thread.sleep(20000L);
        instance2.shutdown();
        System.out.println("Member shut down, the job will now restart and you can inspect the output again.");
    }

    private static Pipeline buildPipeline() {
        Pipeline p = Pipeline.create();
        p.readFrom(Sources.mapJournal((String)PRICES_MAP_NAME, (JournalInitialPosition)JournalInitialPosition.START_FROM_CURRENT, (FunctionEx & Serializable)e -> new PriceUpdateEvent((String)e.getKey(), (Integer)((Tuple2)e.getNewValue()).f0(), (Long)((Tuple2)e.getNewValue()).f1()), (PredicateEx)Util.mapPutEvents())).withTimestamps(PriceUpdateEvent::timestamp, 5000L).setLocalParallelism(1).groupingKey(PriceUpdateEvent::ticker).window((WindowDefinition)WindowDefinition.sliding((long)10000L, (long)1000L)).aggregate(AggregateOperations.counting()).writeTo(Sinks.logger());
        return p;
    }

    private static void updatePrices(JetInstance jet) {
        IMap prices = jet.getMap(PRICES_MAP_NAME);
        int price = 100;
        long timestamp = 0L;
        while (true) {
            for (String ticker : TICKER_LIST) {
                prices.put((Object)ticker, (Object)Tuple2.tuple2((Object)price, (Object)timestamp));
            }
            ++price;
            timestamp += 1000L;
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                return;
            }
        }
    }
}

