/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tinkerpop.gremlin.spark.process.computer;

import com.google.common.base.Optional;
import java.io.Serializable;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.commons.configuration.Configuration;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
import org.apache.tinkerpop.gremlin.process.computer.Messenger;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
import org.apache.tinkerpop.gremlin.spark.process.computer.CombineIterator;
import org.apache.tinkerpop.gremlin.spark.process.computer.MapIterator;
import org.apache.tinkerpop.gremlin.spark.process.computer.ReduceIterator;
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkMemory;
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkMessenger;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayload;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.Payload;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
import org.apache.tinkerpop.gremlin.structure.util.Attachable;
import org.apache.tinkerpop.gremlin.structure.util.Host;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import scala.Tuple2;

public final class SparkExecutor {
    private static final String[] EMPTY_ARRAY = new String[0];

    private SparkExecutor() {
    }

    public static <M> JavaPairRDD<Object, ViewIncomingPayload<M>> executeVertexProgramIteration(JavaPairRDD<Object, VertexWritable> graphRDD, JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD, SparkMemory memory, Configuration apacheConfiguration) {
        if (null != viewIncomingRDD) assert (((Partitioner)graphRDD.partitioner().get()).equals(viewIncomingRDD.partitioner().get()));
        JavaPairRDD viewOutgoingRDD = (null == viewIncomingRDD ? graphRDD.mapValues((Function & Serializable)vertexWritable -> new Tuple2(vertexWritable, (Object)Optional.absent())) : graphRDD.leftOuterJoin(viewIncomingRDD)).mapPartitionsToPair((PairFlatMapFunction & Serializable)partitionIterator -> {
            HadoopPools.initialize((Configuration)apacheConfiguration);
            VertexProgram workerVertexProgram = VertexProgram.createVertexProgram((Graph)HadoopGraph.open((Configuration)apacheConfiguration), (Configuration)apacheConfiguration);
            Set elementComputeKeys = workerVertexProgram.getElementComputeKeys();
            String[] elementComputeKeysArray = elementComputeKeys.size() == 0 ? EMPTY_ARRAY : elementComputeKeys.toArray(new String[elementComputeKeys.size()]);
            SparkMessenger messenger = new SparkMessenger();
            workerVertexProgram.workerIterationStart(memory.asImmutable());
            return () -> IteratorUtils.map((Iterator)partitionIterator, vertexViewIncoming -> {
                boolean hasViewAndMessages;
                StarGraph.StarVertex vertex = ((VertexWritable)((Tuple2)vertexViewIncoming._2())._1()).get();
                if (elementComputeKeysArray.length > 0) {
                    vertex.dropVertexProperties(elementComputeKeysArray);
                }
                List<DetachedVertexProperty> previousView = (hasViewAndMessages = ((Optional)((Tuple2)vertexViewIncoming._2())._2()).isPresent()) ? ((ViewIncomingPayload)((Optional)((Tuple2)vertexViewIncoming._2())._2()).get()).getView() : Collections.emptyList();
                List incomingMessages = hasViewAndMessages ? ((ViewIncomingPayload)((Optional)((Tuple2)vertexViewIncoming._2())._2()).get()).getIncomingMessages() : Collections.emptyList();
                previousView.forEach(property -> {
                    VertexProperty cfr_ignored_0 = (VertexProperty)property.attach(Attachable.Method.create((Host)vertex));
                });
                messenger.setVertexAndIncomingMessages((Vertex)vertex, incomingMessages);
                workerVertexProgram.execute((Vertex)ComputerGraph.vertexProgram((Vertex)vertex, (VertexProgram)workerVertexProgram), (Messenger)messenger, (Memory)memory);
                List nextView = elementComputeKeysArray.length == 0 ? Collections.emptyList() : IteratorUtils.list((Iterator)IteratorUtils.map((Iterator)vertex.properties(elementComputeKeysArray), property -> DetachedFactory.detach((VertexProperty)property, (boolean)true)));
                List outgoingMessages = messenger.getOutgoingMessages();
                if (!partitionIterator.hasNext()) {
                    workerVertexProgram.workerIterationEnd(memory.asImmutable());
                }
                return new Tuple2(vertex.id(), new ViewOutgoingPayload(nextView, outgoingMessages));
            });
        }, true);
        assert (((Partitioner)graphRDD.partitioner().get()).equals(viewOutgoingRDD.partitioner().get()));
        MessageCombiner messageCombiner = VertexProgram.createVertexProgram((Graph)HadoopGraph.open((Configuration)apacheConfiguration), (Configuration)apacheConfiguration).getMessageCombiner().orElse(null);
        JavaPairRDD newViewIncomingRDD = viewOutgoingRDD.flatMapToPair((PairFlatMapFunction & Serializable)tuple -> () -> IteratorUtils.concat((Iterator[])new Iterator[]{IteratorUtils.of((Object)new Tuple2(tuple._1(), (Object)((ViewOutgoingPayload)tuple._2()).getView())), IteratorUtils.map(((ViewOutgoingPayload)tuple._2()).getOutgoingMessages().iterator(), message -> new Tuple2(message._1(), new MessagePayload<Object>(message._2())))})).reduceByKey((Partitioner)graphRDD.partitioner().get(), (Function2 & Serializable)(a, b) -> {
            if (a instanceof ViewIncomingPayload) {
                ((ViewIncomingPayload)a).mergePayload((Payload)b, messageCombiner);
                return a;
            }
            if (b instanceof ViewIncomingPayload) {
                ((ViewIncomingPayload)b).mergePayload((Payload)a, messageCombiner);
                return b;
            }
            ViewIncomingPayload c = new ViewIncomingPayload(messageCombiner);
            c.mergePayload((Payload)a, messageCombiner);
            c.mergePayload((Payload)b, messageCombiner);
            return c;
        }).filter((Function & Serializable)payload -> !(payload._2() instanceof MessagePayload)).filter((Function & Serializable)payload -> !(payload._2() instanceof ViewIncomingPayload) || ((ViewIncomingPayload)payload._2()).hasView()).mapValues((Function & Serializable)payload -> payload instanceof ViewIncomingPayload ? (ViewIncomingPayload)payload : new ViewIncomingPayload((ViewPayload)payload));
        assert (((Partitioner)graphRDD.partitioner().get()).equals(newViewIncomingRDD.partitioner().get()));
        newViewIncomingRDD.foreachPartition((VoidFunction & Serializable)partitionIterator -> HadoopPools.initialize((Configuration)apacheConfiguration));
        return newViewIncomingRDD;
    }

    public static <M> JavaPairRDD<Object, VertexWritable> prepareFinalGraphRDD(JavaPairRDD<Object, VertexWritable> graphRDD, JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD, String[] elementComputeKeys) {
        assert (((Partitioner)graphRDD.partitioner().get()).equals(viewIncomingRDD.partitioner().get()));
        return graphRDD.leftOuterJoin(viewIncomingRDD).mapValues((Function & Serializable)tuple -> {
            StarGraph.StarVertex vertex = ((VertexWritable)tuple._1()).get();
            vertex.dropVertexProperties(elementComputeKeys);
            List<DetachedVertexProperty> view = ((Optional)tuple._2()).isPresent() ? ((ViewIncomingPayload)((Optional)tuple._2()).get()).getView() : Collections.emptyList();
            view.forEach(property -> {
                VertexProperty cfr_ignored_0 = (VertexProperty)property.attach(Attachable.Method.create((Host)vertex));
            });
            return (VertexWritable)tuple._1();
        });
    }

    public static <K, V> JavaPairRDD<K, V> executeMap(JavaPairRDD<Object, VertexWritable> graphRDD, MapReduce<K, V, ?, ?, ?> mapReduce, Configuration apacheConfiguration) {
        JavaPairRDD mapRDD = graphRDD.mapPartitionsToPair((PairFlatMapFunction & Serializable)partitionIterator -> {
            HadoopPools.initialize((Configuration)apacheConfiguration);
            return () -> new MapIterator(MapReduce.createMapReduce((Graph)HadoopGraph.open((Configuration)apacheConfiguration), (Configuration)apacheConfiguration), (Iterator<Tuple2<Object, VertexWritable>>)partitionIterator);
        });
        if (mapReduce.getMapKeySort().isPresent()) {
            mapRDD = mapRDD.sortByKey((Comparator)mapReduce.getMapKeySort().get(), true, 1);
        }
        return mapRDD;
    }

    public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeCombine(JavaPairRDD<K, V> mapRDD, Configuration apacheConfiguration) {
        return mapRDD.mapPartitionsToPair((PairFlatMapFunction & Serializable)partitionIterator -> {
            HadoopPools.initialize((Configuration)apacheConfiguration);
            return () -> new CombineIterator(MapReduce.createMapReduce((Graph)HadoopGraph.open((Configuration)apacheConfiguration), (Configuration)apacheConfiguration), partitionIterator);
        });
    }

    public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(JavaPairRDD<K, V> mapOrCombineRDD, MapReduce<K, V, OK, OV, ?> mapReduce, Configuration apacheConfiguration) {
        JavaPairRDD reduceRDD = mapOrCombineRDD.groupByKey().mapPartitionsToPair((PairFlatMapFunction & Serializable)partitionIterator -> {
            HadoopPools.initialize((Configuration)apacheConfiguration);
            return () -> new ReduceIterator(MapReduce.createMapReduce((Graph)HadoopGraph.open((Configuration)apacheConfiguration), (Configuration)apacheConfiguration), partitionIterator);
        });
        if (mapReduce.getReduceKeySort().isPresent()) {
            reduceRDD = reduceRDD.sortByKey((Comparator)mapReduce.getReduceKeySort().get(), true, 1);
        }
        return reduceRDD;
    }
}

