/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.examples.cogroup;

import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.PredicateEx;
import com.hazelcast.function.ToLongFunctionEx;
import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.datamodel.ItemsByTag;
import com.hazelcast.jet.datamodel.Tag;
import com.hazelcast.jet.examples.cogroup.datamodel.AddToCart;
import com.hazelcast.jet.examples.cogroup.datamodel.PageVisit;
import com.hazelcast.jet.examples.cogroup.datamodel.Payment;
import com.hazelcast.jet.pipeline.JournalInitialPosition;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sources;
import com.hazelcast.jet.pipeline.StageWithKeyAndWindow;
import com.hazelcast.jet.pipeline.StreamStageWithKey;
import com.hazelcast.jet.pipeline.WindowDefinition;
import com.hazelcast.jet.pipeline.WindowGroupAggregateBuilder;
import com.hazelcast.map.IMap;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

public final class WindowedCoGroup {
    private static final String TOPIC = "topic";
    private static final String PAGE_VISIT = "pageVisit";
    private static final String ADD_TO_CART = "addToCart";
    private static final String PAYMENT = "payment";

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) throws Exception {
        System.setProperty("hazelcast.partition.count", "1");
        JetInstance jet = Jet.bootstrappedInstance();
        ProducerTask producer = new ProducerTask(jet);
        try {
            Pipeline p = WindowedCoGroup.coGroupWithBuilder();
            Job job = jet.newJob(p);
            Thread.sleep(5000L);
            producer.stop();
            job.cancel();
        }
        finally {
            producer.stop();
            Jet.shutdownAll();
        }
    }

    private static Pipeline aggregate() {
        Pipeline p = Pipeline.create();
        p.readFrom(Sources.mapJournal((String)PAGE_VISIT, (JournalInitialPosition)JournalInitialPosition.START_FROM_OLDEST, (FunctionEx)Util.mapEventNewValue(), (PredicateEx)Util.mapPutEvents())).withTimestamps((ToLongFunctionEx & Serializable)pv -> pv.timestamp(), 100L).window((WindowDefinition)WindowDefinition.sliding((long)10L, (long)1L)).aggregate(AggregateOperations.counting()).writeTo(Sinks.logger());
        return p;
    }

    private static Pipeline groupAndAggregate() {
        Pipeline p = Pipeline.create();
        p.readFrom(Sources.mapJournal((String)PAGE_VISIT, (JournalInitialPosition)JournalInitialPosition.START_FROM_OLDEST, (FunctionEx)Util.mapEventNewValue(), (PredicateEx)Util.mapPutEvents())).withTimestamps((ToLongFunctionEx & Serializable)pv -> pv.timestamp(), 100L).window((WindowDefinition)WindowDefinition.sliding((long)10L, (long)1L)).groupingKey((FunctionEx & Serializable)pv -> pv.userId()).aggregate(AggregateOperations.toList()).writeTo(Sinks.logger());
        return p;
    }

    private static Pipeline coGroupAndAggregate() {
        Pipeline p = Pipeline.create();
        StreamStageWithKey pageVisits = p.readFrom(Sources.mapJournal((String)PAGE_VISIT, (JournalInitialPosition)JournalInitialPosition.START_FROM_OLDEST, (FunctionEx)Util.mapEventNewValue(), (PredicateEx)Util.mapPutEvents())).withTimestamps((ToLongFunctionEx & Serializable)pv -> pv.timestamp(), 100L).groupingKey((FunctionEx & Serializable)pv -> pv.userId());
        StreamStageWithKey payments = p.readFrom(Sources.mapJournal((String)PAYMENT, (JournalInitialPosition)JournalInitialPosition.START_FROM_OLDEST, (FunctionEx)Util.mapEventNewValue(), (PredicateEx)Util.mapPutEvents())).withTimestamps((ToLongFunctionEx & Serializable)pm -> pm.timestamp(), 100L).groupingKey((FunctionEx & Serializable)pm -> pm.userId());
        StreamStageWithKey addToCarts = p.readFrom(Sources.mapJournal((String)ADD_TO_CART, (JournalInitialPosition)JournalInitialPosition.START_FROM_OLDEST, (FunctionEx)Util.mapEventNewValue(), (PredicateEx)Util.mapPutEvents())).withTimestamps((ToLongFunctionEx & Serializable)atc -> atc.timestamp(), 100L).groupingKey((FunctionEx & Serializable)atc -> atc.userId());
        StageWithKeyAndWindow windowStage = pageVisits.window((WindowDefinition)WindowDefinition.sliding((long)10L, (long)1L));
        windowStage.aggregate3(AggregateOperations.counting(), addToCarts, AggregateOperations.counting(), payments, AggregateOperations.counting()).writeTo(Sinks.logger());
        return p;
    }

    private static Pipeline coGroupWithBuilder() {
        Pipeline p = Pipeline.create();
        StreamStageWithKey pageVisits = p.readFrom(Sources.mapJournal((String)PAGE_VISIT, (JournalInitialPosition)JournalInitialPosition.START_FROM_OLDEST, (FunctionEx)Util.mapEventNewValue(), (PredicateEx)Util.mapPutEvents())).withTimestamps((ToLongFunctionEx & Serializable)pv -> pv.timestamp(), 100L).groupingKey((FunctionEx & Serializable)pv -> pv.userId());
        StreamStageWithKey addToCarts = p.readFrom(Sources.mapJournal((String)ADD_TO_CART, (JournalInitialPosition)JournalInitialPosition.START_FROM_OLDEST, (FunctionEx)Util.mapEventNewValue(), (PredicateEx)Util.mapPutEvents())).withTimestamps((ToLongFunctionEx & Serializable)atc -> atc.timestamp(), 100L).groupingKey((FunctionEx & Serializable)atc -> atc.userId());
        StreamStageWithKey payments = p.readFrom(Sources.mapJournal((String)PAYMENT, (JournalInitialPosition)JournalInitialPosition.START_FROM_OLDEST, (FunctionEx)Util.mapEventNewValue(), (PredicateEx)Util.mapPutEvents())).withTimestamps((ToLongFunctionEx & Serializable)pm -> pm.timestamp(), 100L).groupingKey((FunctionEx & Serializable)pm -> pm.userId());
        StageWithKeyAndWindow windowStage = pageVisits.window((WindowDefinition)WindowDefinition.sliding((long)10L, (long)1L));
        WindowGroupAggregateBuilder builder = windowStage.aggregateBuilder(AggregateOperations.counting());
        Tag pageVisitTag = builder.tag0();
        Tag addToCartTag = builder.add(addToCarts, AggregateOperations.counting());
        Tag paymentTag = builder.add(payments, AggregateOperations.counting());
        builder.build().writeTo(Sinks.logger((FunctionEx & Serializable)r -> {
            ItemsByTag items = (ItemsByTag)r.result();
            return String.format("window(%s..%s): id %d%npageVisits %s%naddToCarts %s%npayments %s", com.hazelcast.jet.impl.util.Util.toLocalTime((long)r.start()), com.hazelcast.jet.impl.util.Util.toLocalTime((long)r.end()), r.getKey(), items.get(pageVisitTag), items.get(addToCartTag), items.get(paymentTag));
        }));
        return p;
    }

    private static class ProducerTask
    implements Runnable {
        private final IMap<Object, PageVisit> pageVisit;
        private final IMap<Object, AddToCart> addToCart;
        private final IMap<Object, Payment> payment;
        private volatile boolean keepGoing = true;
        private int loadTime = 1;
        private int quantity = 21;
        private int amount = 31;
        private long now = System.currentTimeMillis();

        ProducerTask(JetInstance jet) {
            this.pageVisit = jet.getMap(WindowedCoGroup.PAGE_VISIT);
            this.addToCart = jet.getMap(WindowedCoGroup.ADD_TO_CART);
            this.payment = jet.getMap(WindowedCoGroup.PAYMENT);
            new Thread((Runnable)this, "WindowedCoGroup Producer").start();
        }

        @Override
        public void run() {
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100L));
            while (this.keepGoing) {
                this.produceSampleData();
                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1L));
                ++this.now;
            }
        }

        public void stop() {
            this.keepGoing = false;
        }

        private void produceSampleData() {
            for (int userId = 11; userId < 13; ++userId) {
                for (int i = 0; i < 2; ++i) {
                    this.pageVisit.set((Object)WindowedCoGroup.TOPIC, (Object)new PageVisit(this.now, userId, this.loadTime));
                    this.addToCart.set((Object)WindowedCoGroup.TOPIC, (Object)new AddToCart(this.now, userId, this.quantity));
                    this.payment.set((Object)WindowedCoGroup.TOPIC, (Object)new Payment(this.now, userId, this.amount));
                    ++this.loadTime;
                    ++this.quantity;
                    ++this.amount;
                }
            }
        }
    }
}

