/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.examples.imdg;

import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.function.BinaryOperatorEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.Functions;
import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sources;
import com.hazelcast.map.EntryProcessor;
import com.hazelcast.map.IMap;
import java.io.Serializable;
import java.util.Map;

public class MapSourceAndSinks {
    private static final int ITEM_COUNT = 10;
    private static final String MAP_SOURCE = "mapSource";
    private static final String MAP_SINK = "mapSink";
    private static final String MAP_WITH_MERGING_SOURCE = "mapWithMergingSource";
    private static final String MAP_WITH_MERGING_SINK = "mapWithMergingSink";
    private static final String MAP_WITH_UPDATING_SOURCE_SINK = "mapWithUpdatingSourceSink";
    private static final String MAP_WITH_ENTRYPROCESSOR_SOURCE_SINK = "mapWithEntryProcessorSourceSink";
    private final JetInstance jet;

    public MapSourceAndSinks(JetInstance jet) {
        this.jet = jet;
    }

    private static Pipeline mapSourceAndSink(String sourceMapName, String sinkMapName) {
        Pipeline pipeline = Pipeline.create();
        pipeline.readFrom(Sources.map((String)sourceMapName)).writeTo(Sinks.map((String)sinkMapName));
        return pipeline;
    }

    private static Pipeline mapWithUpdating(String mapName) {
        Pipeline pipeline = Pipeline.create();
        pipeline.readFrom(Sources.map((String)mapName)).writeTo(Sinks.mapWithUpdating((String)mapName, (BiFunctionEx & Serializable)(oldValue, item) -> (Integer)item.getKey() % 2 == 0 ? oldValue + "-even" : oldValue + "-odd"));
        return pipeline;
    }

    private static Pipeline mapWithMerging(String sourceMapName, String sinkMapName) {
        Pipeline pipeline = Pipeline.create();
        pipeline.readFrom(Sources.map((String)sourceMapName)).map((FunctionEx & Serializable)e -> Util.entry((Object)"sum", e.getValue())).writeTo(Sinks.mapWithMerging((String)sinkMapName, (BinaryOperatorEx & Serializable)(oldValue, newValue) -> oldValue + newValue));
        return pipeline;
    }

    private static Pipeline mapWithEntryProcessor(String sourceMapName, String sinkMapName) {
        Pipeline pipeline = Pipeline.create();
        pipeline.readFrom(Sources.map((String)sourceMapName)).writeTo(Sinks.mapWithEntryProcessor((String)sinkMapName, (FunctionEx)Functions.entryKey(), (FunctionEx & Serializable)item -> new IncrementEntryProcessor(5)));
        return pipeline;
    }

    public static void main(String[] args) {
        JetInstance jet = Jet.newJetInstance();
        new MapSourceAndSinks(jet).go();
    }

    private void go() {
        try {
            System.out.println("----------Map Source and Sink ----------------");
            MapSourceAndSinks.prepareSampleInput(this.jet, MAP_SOURCE);
            this.jet.newJob(MapSourceAndSinks.mapSourceAndSink(MAP_SOURCE, MAP_SINK)).join();
            MapSourceAndSinks.dumpMap(this.jet, MAP_SINK);
            System.out.println("----------------------------------------------");
            System.out.println("--------------Map with Merging----------------");
            MapSourceAndSinks.prepareSampleInput(this.jet, MAP_WITH_MERGING_SOURCE);
            this.jet.newJob(MapSourceAndSinks.mapWithMerging(MAP_WITH_MERGING_SOURCE, MAP_WITH_MERGING_SINK)).join();
            MapSourceAndSinks.dumpMap(this.jet, MAP_WITH_MERGING_SINK);
            System.out.println("----------------------------------------------");
            System.out.println("------------Map with Updating ----------------");
            MapSourceAndSinks.prepareMapWithUpdatingSampleInput(this.jet, MAP_WITH_UPDATING_SOURCE_SINK);
            this.jet.newJob(MapSourceAndSinks.mapWithUpdating(MAP_WITH_UPDATING_SOURCE_SINK)).join();
            MapSourceAndSinks.dumpMap(this.jet, MAP_WITH_UPDATING_SOURCE_SINK);
            System.out.println("----------------------------------------------");
            System.out.println("----------Map with EntryProcessor ------------");
            MapSourceAndSinks.prepareSampleInput(this.jet, MAP_WITH_ENTRYPROCESSOR_SOURCE_SINK);
            this.jet.newJob(MapSourceAndSinks.mapWithEntryProcessor(MAP_WITH_ENTRYPROCESSOR_SOURCE_SINK, MAP_WITH_ENTRYPROCESSOR_SOURCE_SINK)).join();
            MapSourceAndSinks.dumpMap(this.jet, MAP_WITH_ENTRYPROCESSOR_SOURCE_SINK);
            System.out.println("----------------------------------------------");
        }
        finally {
            Jet.shutdownAll();
        }
    }

    private static void dumpMap(JetInstance instance, String mapName) {
        IMap sinkMap = instance.getMap(mapName);
        System.out.println("Sink map size: " + sinkMap.size());
        System.out.println("Sink map entries: ");
        sinkMap.forEach((k, v) -> System.out.println(k + " - " + v));
    }

    private static void prepareSampleInput(JetInstance instance, String sourceMapName) {
        IMap sourceMap = instance.getMap(sourceMapName);
        for (int i = 0; i < 10; ++i) {
            sourceMap.put((Object)i, (Object)i);
        }
    }

    private static void prepareMapWithUpdatingSampleInput(JetInstance instance, String sourceMapName) {
        IMap sourceMap = instance.getMap(sourceMapName);
        for (int i = 0; i < 10; ++i) {
            sourceMap.put((Object)i, (Object)Integer.toString(i));
        }
    }

    static class IncrementEntryProcessor
    implements EntryProcessor<Integer, Integer, Integer> {
        private int incrementBy;

        IncrementEntryProcessor(int incrementBy) {
            this.incrementBy = incrementBy;
        }

        public Integer process(Map.Entry<Integer, Integer> entry) {
            return entry.setValue(entry.getValue() + this.incrementBy);
        }
    }
}

