/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.examples.cogroup;

import com.hazelcast.core.IList;
import com.hazelcast.core.IMap;
import com.hazelcast.jet.IListJet;
import com.hazelcast.jet.IMapJet;
import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.datamodel.ItemsByTag;
import com.hazelcast.jet.datamodel.Tag;
import com.hazelcast.jet.datamodel.Tuple3;
import com.hazelcast.jet.examples.cogroup.datamodel.AddToCart;
import com.hazelcast.jet.examples.cogroup.datamodel.PageVisit;
import com.hazelcast.jet.examples.cogroup.datamodel.Payment;
import com.hazelcast.jet.function.FunctionEx;
import com.hazelcast.jet.pipeline.BatchStage;
import com.hazelcast.jet.pipeline.BatchStageWithKey;
import com.hazelcast.jet.pipeline.GroupAggregateBuilder;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sources;
import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

public final class BatchCoGroup {
    private static final String PAGE_VISIT = "pageVisit";
    private static final String ADD_TO_CART = "addToCart";
    private static final String PAYMENT = "payment";
    private static final String RESULT = "result";
    private final JetInstance jet;
    private final Map<Integer, Set<PageVisit>> userId2PageVisit = new HashMap<Integer, Set<PageVisit>>();
    private final Map<Integer, Set<AddToCart>> userId2AddToCart = new HashMap<Integer, Set<AddToCart>>();
    private final Map<Integer, Set<Payment>> userId2Payment = new HashMap<Integer, Set<Payment>>();

    private BatchCoGroup(JetInstance jet) {
        this.jet = jet;
    }

    private static Pipeline coGroupDirect() {
        Pipeline p = Pipeline.create();
        BatchStageWithKey pageVisits = p.drawFrom(Sources.list((String)PAGE_VISIT)).groupingKey((FunctionEx & Serializable)pageVisit -> pageVisit.userId());
        BatchStageWithKey addToCarts = p.drawFrom(Sources.list((String)ADD_TO_CART)).groupingKey((FunctionEx & Serializable)addToCart -> addToCart.userId());
        BatchStageWithKey payments = p.drawFrom(Sources.list((String)PAYMENT)).groupingKey((FunctionEx & Serializable)payment -> payment.userId());
        BatchStage coGrouped = pageVisits.aggregate3(AggregateOperations.toList(), addToCarts, AggregateOperations.toList(), payments, AggregateOperations.toList());
        coGrouped.drainTo(Sinks.map((String)RESULT));
        return p;
    }

    private static Pipeline coGroupBuild() {
        Pipeline p = Pipeline.create();
        BatchStageWithKey pageVisits = p.drawFrom(Sources.list((String)PAGE_VISIT)).groupingKey((FunctionEx & Serializable)pageVisit -> pageVisit.userId());
        BatchStageWithKey addToCarts = p.drawFrom(Sources.list((String)ADD_TO_CART)).groupingKey((FunctionEx & Serializable)addToCart -> addToCart.userId());
        BatchStageWithKey payments = p.drawFrom(Sources.list((String)PAYMENT)).groupingKey((FunctionEx & Serializable)payment -> payment.userId());
        GroupAggregateBuilder builder = pageVisits.aggregateBuilder(AggregateOperations.toList());
        Tag visitTag = builder.tag0();
        Tag cartTag = builder.add(addToCarts, AggregateOperations.toList());
        Tag payTag = builder.add(payments, AggregateOperations.toList());
        BatchStage coGrouped = builder.build().map((FunctionEx & Serializable)keyAndVals -> {
            ItemsByTag ibt = (ItemsByTag)keyAndVals.getValue();
            return Util.entry(keyAndVals.getKey(), (Object)Tuple3.tuple3((Object)ibt.get(visitTag), (Object)ibt.get(cartTag), (Object)ibt.get(payTag)));
        });
        coGrouped.drainTo(Sinks.map((String)RESULT));
        return p;
    }

    public static void main(String[] args) {
        JetInstance jet = Jet.newJetInstance();
        Jet.newJetInstance();
        new BatchCoGroup(jet).go();
    }

    private void go() {
        this.prepareSampleData();
        try {
            this.jet.newJob(BatchCoGroup.coGroupDirect()).join();
            this.validateCoGroupResults();
            this.jet.getMap(RESULT).clear();
            this.jet.newJob(BatchCoGroup.coGroupBuild()).join();
            this.validateCoGroupResults();
        }
        finally {
            Jet.shutdownAll();
        }
    }

    private void validateCoGroupResults() {
        IMapJet result = this.jet.getMap(RESULT);
        BatchCoGroup.printImap(result);
        for (int userId = 11; userId < 13; ++userId) {
            Tuple3 r = (Tuple3)result.get((Object)userId);
            BatchCoGroup.assertEqual(this.userId2PageVisit.get(userId), (Collection)r.f0());
            BatchCoGroup.assertEqual(this.userId2AddToCart.get(userId), (Collection)r.f1());
            BatchCoGroup.assertEqual(this.userId2Payment.get(userId), (Collection)r.f2());
        }
        System.out.println("BatchCoGroup results are valid");
    }

    private void prepareSampleData() {
        IListJet addToCartList = this.jet.getList(ADD_TO_CART);
        IListJet paymentList = this.jet.getList(PAYMENT);
        IListJet pageVisitList = this.jet.getList(PAGE_VISIT);
        int quantity = 21;
        int amount = 31;
        int loadTime = 1;
        long timestamp = System.currentTimeMillis();
        for (int userId = 11; userId < 13; ++userId) {
            this.userId2AddToCart.put(userId, new HashSet());
            this.userId2Payment.put(userId, new HashSet());
            this.userId2PageVisit.put(userId, new HashSet());
            for (int i = 0; i < 2; ++i) {
                PageVisit visit = new PageVisit(timestamp, userId, loadTime);
                AddToCart atc = new AddToCart(timestamp, userId, quantity);
                Payment pay = new Payment(timestamp, userId, amount);
                addToCartList.add((Object)atc);
                paymentList.add((Object)pay);
                pageVisitList.add((Object)visit);
                this.userId2AddToCart.get(userId).add(atc);
                this.userId2Payment.get(userId).add(pay);
                this.userId2PageVisit.get(userId).add(visit);
                ++loadTime;
                ++quantity;
                ++amount;
                timestamp += 1000L;
            }
        }
        BatchCoGroup.printIList(addToCartList);
        BatchCoGroup.printIList(paymentList);
        BatchCoGroup.printIList(pageVisitList);
    }

    private static <T> void assertEqual(Set<T> expected, Collection<T> actual) {
        if (actual.size() != expected.size() || !expected.containsAll(actual)) {
            throw new AssertionError((Object)("Mismatch: expected " + expected + "; actual " + actual));
        }
    }

    private static <K, V> void printImap(IMap<K, V> imap) {
        StringBuilder sb = new StringBuilder();
        System.out.println(imap.getName() + ':');
        imap.forEach((k, v) -> sb.append(k).append("->").append(v).append('\n'));
        System.out.println(sb);
    }

    private static void printIList(IList<?> list) {
        StringBuilder sb = new StringBuilder();
        System.out.println(list.getName() + ':');
        list.forEach(e -> sb.append(e).append('\n'));
        System.out.println(sb);
    }
}

