/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.examples.enrichment;

import com.hazelcast.core.IMap;
import com.hazelcast.core.ReplicatedMap;
import com.hazelcast.jet.IMapJet;
import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.jet.datamodel.Tuple3;
import com.hazelcast.jet.examples.enrichment.EventGenerator;
import com.hazelcast.jet.examples.enrichment.datamodel.Broker;
import com.hazelcast.jet.examples.enrichment.datamodel.Product;
import com.hazelcast.jet.examples.enrichment.datamodel.Trade;
import com.hazelcast.jet.function.BiFunctionEx;
import com.hazelcast.jet.function.FunctionEx;
import com.hazelcast.jet.function.Functions;
import com.hazelcast.jet.pipeline.BatchSource;
import com.hazelcast.jet.pipeline.BatchStage;
import com.hazelcast.jet.pipeline.JoinClause;
import com.hazelcast.jet.pipeline.JournalInitialPosition;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sources;
import com.hazelcast.jet.pipeline.StreamStage;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.stream.Stream;

public final class Enrichment {
    private static final String TRADES = "trades";
    private static final String PRODUCTS = "products";
    private static final String BROKERS = "brokers";
    private final JetInstance jet;

    private Enrichment(JetInstance jet) {
        this.jet = jet;
    }

    private Pipeline enrichUsingIMap() {
        IMapJet productMap = this.jet.getMap(PRODUCTS);
        Enrichment.readLines("products.txt").forEach(e -> {
            Product cfr_ignored_0 = (Product)productMap.put(e.getKey(), (Object)new Product((Integer)e.getKey(), (String)e.getValue()));
        });
        System.out.println("Loaded product map:");
        Enrichment.printMap(productMap);
        IMapJet brokerMap = this.jet.getMap(BROKERS);
        Enrichment.readLines("brokers.txt").forEach(e -> {
            Broker cfr_ignored_0 = (Broker)brokerMap.put(e.getKey(), (Object)new Broker((Integer)e.getKey(), (String)e.getValue()));
        });
        System.out.println("Loaded brokers map:");
        Enrichment.printMap(brokerMap);
        Pipeline p = Pipeline.create();
        StreamStage trades = p.drawFrom(Sources.mapJournal((String)TRADES, (JournalInitialPosition)JournalInitialPosition.START_FROM_CURRENT)).withoutTimestamps().map(Functions.entryValue());
        trades.mapUsingIMap((IMap)productMap, (FunctionEx & Serializable)trade -> trade.productId(), (BiFunctionEx & Serializable)(t, product) -> Tuple2.tuple2((Object)t, (Object)product.name())).mapUsingIMap((IMap)brokerMap, (FunctionEx & Serializable)t -> ((Trade)t.f0()).brokerId(), (BiFunctionEx & Serializable)(t, broker) -> Tuple3.tuple3((Object)t.f0(), (Object)t.f1(), (Object)broker.name())).drainTo(Sinks.logger());
        return p;
    }

    private Pipeline enrichUsingReplicatedMap() {
        ReplicatedMap productMap = this.jet.getReplicatedMap(PRODUCTS);
        Enrichment.readLines("products.txt").forEach(e -> {
            Product cfr_ignored_0 = (Product)productMap.put(e.getKey(), (Object)new Product((Integer)e.getKey(), (String)e.getValue()));
        });
        System.out.println("Loaded product replicated map:");
        Enrichment.printMap(productMap);
        ReplicatedMap brokerMap = this.jet.getReplicatedMap(BROKERS);
        Enrichment.readLines("brokers.txt").forEach(e -> {
            Broker cfr_ignored_0 = (Broker)brokerMap.put(e.getKey(), (Object)new Broker((Integer)e.getKey(), (String)e.getValue()));
        });
        System.out.println("Loaded brokers replicated map:");
        Enrichment.printMap(brokerMap);
        Pipeline p = Pipeline.create();
        StreamStage trades = p.drawFrom(Sources.mapJournal((String)TRADES, (JournalInitialPosition)JournalInitialPosition.START_FROM_CURRENT)).withoutTimestamps().map(Functions.entryValue());
        trades.mapUsingReplicatedMap(productMap, Trade::productId, (BiFunctionEx & Serializable)(t, product) -> Tuple2.tuple2((Object)t, (Object)product.name())).mapUsingReplicatedMap(brokerMap, (FunctionEx & Serializable)t -> ((Trade)t.f0()).brokerId(), (BiFunctionEx & Serializable)(t, broker) -> Tuple3.tuple3((Object)t.f0(), (Object)t.f1(), (Object)broker.name())).drainTo(Sinks.logger());
        return p;
    }

    private static Pipeline enrichUsingHashJoin() {
        Pipeline p = Pipeline.create();
        StreamStage trades = p.drawFrom(Sources.mapJournal((String)TRADES, (JournalInitialPosition)JournalInitialPosition.START_FROM_CURRENT)).withoutTimestamps().map(Functions.entryValue());
        String resourcesPath = Enrichment.getClasspathDirectory(".").toString();
        BatchSource products = Sources.filesBuilder((String)resourcesPath).sharedFileSystem(true).glob("products.txt").build((BiFunctionEx & Serializable)(file, line) -> {
            Map.Entry<Integer, String> split = Enrichment.splitLine(line);
            return Util.entry((Object)split.getKey(), (Object)new Product(split.getKey(), split.getValue()));
        });
        BatchSource brokers = Sources.filesBuilder((String)resourcesPath).sharedFileSystem(true).glob("brokers.txt").build((BiFunctionEx & Serializable)(file, line) -> {
            Map.Entry<Integer, String> split = Enrichment.splitLine(line);
            return Util.entry((Object)split.getKey(), (Object)new Broker(split.getKey(), split.getValue()));
        });
        BatchStage prodEntries = p.drawFrom(products);
        BatchStage brokEntries = p.drawFrom(brokers);
        trades.hashJoin2(prodEntries, JoinClause.joinMapEntries(Trade::productId), brokEntries, JoinClause.joinMapEntries(Trade::brokerId), Tuple3::tuple3).drainTo(Sinks.logger());
        return p;
    }

    public static void main(String[] args) throws Exception {
        JetInstance jet = Jet.newJetInstance();
        Jet.newJetInstance();
        new Enrichment(jet).go();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void go() throws Exception {
        EventGenerator eventGenerator = new EventGenerator((IMap<Object, Trade>)this.jet.getMap(TRADES));
        eventGenerator.start();
        try {
            Pipeline p = this.enrichUsingIMap();
            Job job = this.jet.newJob(p);
            eventGenerator.generateEventsForFiveSeconds();
            job.cancel();
            try {
                job.join();
            }
            catch (CancellationException cancellationException) {
                // empty catch block
            }
        }
        finally {
            eventGenerator.shutdown();
            Jet.shutdownAll();
        }
    }

    private static Stream<Map.Entry<Integer, String>> readLines(String file) {
        try {
            InputStream stream = Enrichment.class.getResourceAsStream("/" + file);
            BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
            return reader.lines().map(Enrichment::splitLine);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static Map.Entry<Integer, String> splitLine(String e) {
        int commaPos = e.indexOf(44);
        return Util.entry((Object)Integer.valueOf(e.substring(0, commaPos)), (Object)e.substring(commaPos + 1));
    }

    private static <K, V> void printMap(Map<K, V> imap) {
        StringBuilder sb = new StringBuilder();
        imap.forEach((k, v) -> sb.append(k).append("->").append(v).append('\n'));
        System.out.println(sb);
    }

    private static Path getClasspathDirectory(String name) {
        try {
            return Paths.get(Enrichment.class.getResource(name).toURI());
        }
        catch (URISyntaxException e) {
            throw new RuntimeException(e);
        }
    }
}

