/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tinkerpop.gremlin.spark.process.computer;

import com.google.common.base.Optional;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
import org.apache.tinkerpop.gremlin.process.computer.Messenger;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkMapEmitter;
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkMemory;
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkMessenger;
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkReduceEmitter;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayload;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.Payload;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
import org.apache.tinkerpop.gremlin.structure.util.Attachable;
import org.apache.tinkerpop.gremlin.structure.util.Host;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import scala.Tuple2;

public final class SparkExecutor {
    private static final String[] EMPTY_ARRAY = new String[0];

    private SparkExecutor() {
    }

    public static <M> JavaPairRDD<Object, ViewIncomingPayload<M>> executeVertexProgramIteration(JavaPairRDD<Object, VertexWritable> graphRDD, JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD, SparkMemory memory, org.apache.commons.configuration.Configuration apacheConfiguration) {
        JavaPairRDD viewOutgoingRDD = (null == viewIncomingRDD ? graphRDD.mapValues((Function & Serializable)vertexWritable -> new Tuple2(vertexWritable, (Object)Optional.absent())) : graphRDD.leftOuterJoin(viewIncomingRDD)).mapPartitionsToPair((PairFlatMapFunction & Serializable)partitionIterator -> {
            HadoopPools.initialize((org.apache.commons.configuration.Configuration)apacheConfiguration);
            VertexProgram workerVertexProgram = VertexProgram.createVertexProgram((Graph)HadoopGraph.open((org.apache.commons.configuration.Configuration)apacheConfiguration), (org.apache.commons.configuration.Configuration)apacheConfiguration);
            Set elementComputeKeys = workerVertexProgram.getElementComputeKeys();
            String[] elementComputeKeysArray = elementComputeKeys.size() == 0 ? EMPTY_ARRAY : elementComputeKeys.toArray(new String[elementComputeKeys.size()]);
            SparkMessenger messenger = new SparkMessenger();
            workerVertexProgram.workerIterationStart(memory.asImmutable());
            return () -> IteratorUtils.map((Iterator)partitionIterator, vertexViewIncoming -> {
                boolean hasViewAndMessages;
                StarGraph.StarVertex vertex = ((VertexWritable)((Tuple2)vertexViewIncoming._2())._1()).get();
                if (elementComputeKeysArray.length > 0) {
                    vertex.dropVertexProperties(elementComputeKeysArray);
                }
                List<DetachedVertexProperty> previousView = (hasViewAndMessages = ((Optional)((Tuple2)vertexViewIncoming._2())._2()).isPresent()) ? ((ViewIncomingPayload)((Optional)((Tuple2)vertexViewIncoming._2())._2()).get()).getView() : Collections.emptyList();
                List incomingMessages = hasViewAndMessages ? ((ViewIncomingPayload)((Optional)((Tuple2)vertexViewIncoming._2())._2()).get()).getIncomingMessages() : Collections.emptyList();
                previousView.forEach(property -> {
                    VertexProperty cfr_ignored_0 = (VertexProperty)property.attach(Attachable.Method.create((Host)vertex));
                });
                messenger.setVertexAndIncomingMessages((Vertex)vertex, incomingMessages);
                workerVertexProgram.execute((Vertex)ComputerGraph.vertexProgram((Vertex)vertex, (VertexProgram)workerVertexProgram), (Messenger)messenger, (Memory)memory);
                List nextView = elementComputeKeysArray.length == 0 ? Collections.emptyList() : IteratorUtils.list((Iterator)IteratorUtils.map((Iterator)vertex.properties(elementComputeKeysArray), property -> DetachedFactory.detach((VertexProperty)property, (boolean)true)));
                List outgoingMessages = messenger.getOutgoingMessages();
                if (!partitionIterator.hasNext()) {
                    workerVertexProgram.workerIterationEnd(memory.asImmutable());
                }
                return new Tuple2(vertex.id(), new ViewOutgoingPayload(nextView, outgoingMessages));
            });
        }).setName("viewOutgoingRDD");
        MessageCombiner messageCombiner = VertexProgram.createVertexProgram((Graph)HadoopGraph.open((org.apache.commons.configuration.Configuration)apacheConfiguration), (org.apache.commons.configuration.Configuration)apacheConfiguration).getMessageCombiner().orElse(null);
        JavaPairRDD newViewIncomingRDD = viewOutgoingRDD.flatMapToPair((PairFlatMapFunction & Serializable)tuple -> () -> IteratorUtils.concat((Iterator[])new Iterator[]{IteratorUtils.of((Object)new Tuple2(tuple._1(), (Object)((ViewOutgoingPayload)tuple._2()).getView())), IteratorUtils.map(((ViewOutgoingPayload)tuple._2()).getOutgoingMessages().iterator(), message -> new Tuple2(message._1(), new MessagePayload<Object>(message._2())))})).reduceByKey((Function2 & Serializable)(a, b) -> {
            if (a instanceof ViewIncomingPayload) {
                ((ViewIncomingPayload)a).mergePayload((Payload)b, messageCombiner);
                return a;
            }
            if (b instanceof ViewIncomingPayload) {
                ((ViewIncomingPayload)b).mergePayload((Payload)a, messageCombiner);
                return b;
            }
            ViewIncomingPayload c = new ViewIncomingPayload(messageCombiner);
            c.mergePayload((Payload)a, messageCombiner);
            c.mergePayload((Payload)b, messageCombiner);
            return c;
        }).filter((Function & Serializable)payload -> !(payload._2() instanceof MessagePayload)).filter((Function & Serializable)payload -> !(payload._2() instanceof ViewIncomingPayload) || ((ViewIncomingPayload)payload._2()).hasView()).mapValues((Function & Serializable)payload -> payload instanceof ViewIncomingPayload ? (ViewIncomingPayload)payload : new ViewIncomingPayload((ViewPayload)payload));
        newViewIncomingRDD.setName("viewIncomingRDD").foreachPartition((VoidFunction & Serializable)partitionIterator -> HadoopPools.initialize((org.apache.commons.configuration.Configuration)apacheConfiguration));
        return newViewIncomingRDD;
    }

    public static <M> JavaPairRDD<Object, VertexWritable> prepareGraphRDDForMapReduce(JavaPairRDD<Object, VertexWritable> graphRDD, JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD, String[] elementComputeKeys) {
        return null == viewIncomingRDD ? graphRDD.mapValues((Function & Serializable)vertexWritable -> {
            vertexWritable.get().dropEdges();
            return vertexWritable;
        }) : graphRDD.leftOuterJoin(viewIncomingRDD).mapValues((Function & Serializable)tuple -> {
            StarGraph.StarVertex vertex = ((VertexWritable)tuple._1()).get();
            vertex.dropEdges();
            vertex.dropVertexProperties(elementComputeKeys);
            List<DetachedVertexProperty> view = ((Optional)tuple._2()).isPresent() ? ((ViewIncomingPayload)((Optional)tuple._2()).get()).getView() : Collections.emptyList();
            view.forEach(property -> {
                VertexProperty cfr_ignored_0 = (VertexProperty)property.attach(Attachable.Method.create((Host)vertex));
            });
            return (VertexWritable)tuple._1();
        });
    }

    public static <K, V> JavaPairRDD<K, V> executeMap(JavaPairRDD<Object, VertexWritable> graphRDD, MapReduce<K, V, ?, ?, ?> mapReduce, org.apache.commons.configuration.Configuration apacheConfiguration) {
        JavaPairRDD mapRDD = graphRDD.mapPartitionsToPair((PairFlatMapFunction & Serializable)partitionIterator -> {
            HadoopPools.initialize((org.apache.commons.configuration.Configuration)apacheConfiguration);
            MapReduce workerMapReduce = MapReduce.createMapReduce((Graph)HadoopGraph.open((org.apache.commons.configuration.Configuration)apacheConfiguration), (org.apache.commons.configuration.Configuration)apacheConfiguration);
            workerMapReduce.workerStart(MapReduce.Stage.MAP);
            SparkMapEmitter mapEmitter = new SparkMapEmitter();
            return () -> IteratorUtils.flatMap((Iterator)partitionIterator, vertexWritable -> {
                workerMapReduce.map((Vertex)ComputerGraph.mapReduce((Vertex)((VertexWritable)vertexWritable._2()).get()), (MapReduce.MapEmitter)mapEmitter);
                if (!partitionIterator.hasNext()) {
                    workerMapReduce.workerEnd(MapReduce.Stage.MAP);
                }
                return mapEmitter.getEmissions();
            });
        });
        if (mapReduce.getMapKeySort().isPresent()) {
            mapRDD = mapRDD.sortByKey((Comparator)mapReduce.getMapKeySort().get());
        }
        return mapRDD;
    }

    public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(JavaPairRDD<K, V> mapRDD, MapReduce<K, V, OK, OV, ?> mapReduce, org.apache.commons.configuration.Configuration apacheConfiguration) {
        JavaPairRDD reduceRDD = mapRDD.groupByKey().mapPartitionsToPair((PairFlatMapFunction & Serializable)partitionIterator -> {
            HadoopPools.initialize((org.apache.commons.configuration.Configuration)apacheConfiguration);
            MapReduce workerMapReduce = MapReduce.createMapReduce((Graph)HadoopGraph.open((org.apache.commons.configuration.Configuration)apacheConfiguration), (org.apache.commons.configuration.Configuration)apacheConfiguration);
            workerMapReduce.workerStart(MapReduce.Stage.REDUCE);
            SparkReduceEmitter reduceEmitter = new SparkReduceEmitter();
            return () -> IteratorUtils.flatMap((Iterator)partitionIterator, keyValue -> {
                workerMapReduce.reduce(keyValue._1(), ((Iterable)keyValue._2()).iterator(), (MapReduce.ReduceEmitter)reduceEmitter);
                if (!partitionIterator.hasNext()) {
                    workerMapReduce.workerEnd(MapReduce.Stage.REDUCE);
                }
                return reduceEmitter.getEmissions();
            });
        });
        if (mapReduce.getReduceKeySort().isPresent()) {
            reduceRDD = reduceRDD.sortByKey((Comparator)mapReduce.getReduceKeySort().get());
        }
        return reduceRDD;
    }

    public static void saveMapReduceRDD(JavaPairRDD<Object, Object> mapReduceRDD, MapReduce mapReduce, Memory.Admin memory, Configuration hadoopConfiguration) {
        String outputLocation = hadoopConfiguration.get("gremlin.hadoop.outputLocation", null);
        if (null != outputLocation) {
            mapReduceRDD.mapToPair((PairFunction & Serializable)keyValue -> new Tuple2((Object)new ObjectWritable(keyValue._1()), (Object)new ObjectWritable(keyValue._2()))).saveAsNewAPIHadoopFile(outputLocation + "/" + mapReduce.getMemoryKey(), ObjectWritable.class, ObjectWritable.class, SequenceFileOutputFormat.class, hadoopConfiguration);
            try {
                mapReduce.addResultToMemory(memory, (Iterator)new ObjectWritableIterator(hadoopConfiguration, new Path(outputLocation + "/" + mapReduce.getMemoryKey())));
            }
            catch (IOException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
        }
    }
}

