/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tinkerpop.gremlin.spark.process.computer;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.stream.Stream;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationUtils;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkExecutor;
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkMemory;
import org.apache.tinkerpop.gremlin.spark.structure.io.InputFormatRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.InputOutputHelper;
import org.apache.tinkerpop.gremlin.spark.structure.io.InputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.OutputFormatRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.OutputRDD;
import org.apache.tinkerpop.gremlin.structure.Graph;

public final class SparkGraphComputer
extends AbstractHadoopGraphComputer {
    private final Configuration sparkConfiguration = new HadoopConfiguration();

    public SparkGraphComputer(HadoopGraph hadoopGraph) {
        super(hadoopGraph);
        ConfigurationUtils.copy((Configuration)this.hadoopGraph.configuration(), (Configuration)this.sparkConfiguration);
    }

    public GraphComputer workers(int workers) {
        super.workers(workers);
        if (this.sparkConfiguration.getString("spark.master").startsWith("local")) {
            this.sparkConfiguration.setProperty("spark.master", (Object)("local[" + this.workers + "]"));
        }
        return this;
    }

    public GraphComputer configure(String key, Object value) {
        this.sparkConfiguration.setProperty(key, value);
        return this;
    }

    protected void validateStatePriorToExecution() {
        super.validateStatePriorToExecution();
        if (this.sparkConfiguration.containsKey("gremlin.spark.graphInputRDD") && this.sparkConfiguration.containsKey("gremlin.hadoop.graphInputFormat")) {
            this.logger.warn("Both gremlin.spark.graphInputRDD and gremlin.hadoop.graphInputFormat were specified, ignoring gremlin.hadoop.graphInputFormat");
        }
        if (this.sparkConfiguration.containsKey("gremlin.spark.graphOutputRDD") && this.sparkConfiguration.containsKey("gremlin.hadoop.graphOutputFormat")) {
            this.logger.warn("Both gremlin.spark.graphOutputRDD and gremlin.hadoop.graphOutputFormat were specified, ignoring gremlin.hadoop.graphOutputFormat");
        }
    }

    public Future<ComputerResult> submit() {
        this.validateStatePriorToExecution();
        HadoopConfiguration apacheConfiguration = new HadoopConfiguration(this.sparkConfiguration);
        apacheConfiguration.setProperty("gremlin.hadoop.graphInputFormat.hasEdges", (Object)this.persist.equals((Object)GraphComputer.Persist.EDGES));
        org.apache.hadoop.conf.Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration((Configuration)apacheConfiguration);
        if (hadoopConfiguration.get("gremlin.spark.graphInputRDD", null) == null && hadoopConfiguration.get("gremlin.hadoop.graphInputFormat", null) != null && FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass("gremlin.hadoop.graphInputFormat", InputFormat.class))) {
            try {
                String inputLocation = FileSystem.get((org.apache.hadoop.conf.Configuration)hadoopConfiguration).getFileStatus(new Path(hadoopConfiguration.get("gremlin.hadoop.inputLocation"))).getPath().toString();
                apacheConfiguration.setProperty("mapreduce.input.fileinputformat.inputdir", (Object)inputLocation);
                hadoopConfiguration.set("mapreduce.input.fileinputformat.inputdir", inputLocation);
            }
            catch (IOException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
        }
        return CompletableFuture.supplyAsync(() -> this.lambda$submit$21(hadoopConfiguration, (Configuration)apacheConfiguration));
    }

    private void loadJars(JavaSparkContext sparkContext, org.apache.hadoop.conf.Configuration hadoopConfiguration) {
        if (hadoopConfiguration.getBoolean("gremlin.hadoop.jarsInDistributedCache", true)) {
            String hadoopGremlinLocalLibs;
            String string = hadoopGremlinLocalLibs = null == System.getProperty("HADOOP_GREMLIN_LIBS") ? System.getenv("HADOOP_GREMLIN_LIBS") : System.getProperty("HADOOP_GREMLIN_LIBS");
            if (null == hadoopGremlinLocalLibs) {
                this.logger.warn("HADOOP_GREMLIN_LIBS is not set -- proceeding regardless");
            } else {
                String[] paths;
                for (String path : paths = hadoopGremlinLocalLibs.split(":")) {
                    File file = new File(path);
                    if (file.exists()) {
                        Stream.of(file.listFiles()).filter(f -> f.getName().endsWith(".jar")).forEach(f -> sparkContext.addJar(f.getAbsolutePath()));
                        continue;
                    }
                    this.logger.warn(path + " does not reference a valid directory -- proceeding regardless");
                }
            }
        }
    }

    private void updateLocalConfiguration(JavaSparkContext sparkContext, SparkConf sparkConfiguration) {
        String[] validPropertyNames;
        for (String propertyName : validPropertyNames = new String[]{"spark.job.description", "spark.jobGroup.id", "spark.job.interruptOnCancel", "spark.scheduler.pool"}) {
            if (!sparkConfiguration.contains(propertyName)) continue;
            String propertyValue = sparkConfiguration.get(propertyName);
            this.logger.info("Setting Thread Local SparkContext Property - " + propertyName + " : " + propertyValue);
            sparkContext.setLocalProperty(propertyName, sparkConfiguration.get(propertyName));
        }
    }

    public static void main(String[] args) throws Exception {
        PropertiesConfiguration configuration = new PropertiesConfiguration(args[0]);
        new SparkGraphComputer(HadoopGraph.open((Configuration)configuration)).program(VertexProgram.createVertexProgram((Graph)HadoopGraph.open((Configuration)configuration), (Configuration)configuration)).submit().get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private /* synthetic */ ComputerResult lambda$submit$21(org.apache.hadoop.conf.Configuration configuration, Configuration configuration2) {
        long startTime = System.currentTimeMillis();
        SparkMemory memory = null;
        String outputLocation = configuration.get("gremlin.hadoop.outputLocation", null);
        try {
            if (null != outputLocation && FileSystem.get((org.apache.hadoop.conf.Configuration)configuration).exists(new Path(outputLocation))) {
                FileSystem.get((org.apache.hadoop.conf.Configuration)configuration).delete(new Path(outputLocation), true);
            }
        }
        catch (IOException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        SparkConf sparkConfiguration = new SparkConf();
        sparkConfiguration.setAppName("HadoopGremlin(Spark): " + (null == this.vertexProgram ? "No VertexProgram" : this.vertexProgram) + "[" + this.mapReducers + "]");
        configuration.forEach(entry -> sparkConfiguration.set((String)entry.getKey(), (String)entry.getValue()));
        JavaSparkContext sparkContext = null;
        try {
            MapMemory finalMemory;
            JavaPairRDD graphRDD;
            sparkContext = new JavaSparkContext(SparkContext.getOrCreate((SparkConf)sparkConfiguration));
            this.updateLocalConfiguration(sparkContext, sparkConfiguration);
            this.loadJars(sparkContext, configuration);
            try {
                graphRDD = ((InputRDD)configuration.getClass("gremlin.spark.graphInputRDD", InputFormatRDD.class, InputRDD.class).newInstance()).readGraphRDD(configuration2, sparkContext).setName(sparkConfiguration.get("gremlin.hadoop.outputLocation", "graphRDD")).cache();
            }
            catch (IllegalAccessException | InstantiationException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            JavaPairRDD viewIncomingRDD = null;
            if (null != this.vertexProgram) {
                memory = new SparkMemory(this.vertexProgram, this.mapReducers, sparkContext);
                this.vertexProgram.setup((Memory)memory);
                memory.broadcastMemory(sparkContext);
                HadoopConfiguration vertexProgramConfiguration = new HadoopConfiguration();
                this.vertexProgram.storeState((Configuration)vertexProgramConfiguration);
                ConfigurationUtils.copy((Configuration)vertexProgramConfiguration, (Configuration)configuration2);
                ConfUtil.mergeApacheIntoHadoopConfiguration((Configuration)vertexProgramConfiguration, (org.apache.hadoop.conf.Configuration)configuration);
                while (true) {
                    memory.setInTask(true);
                    viewIncomingRDD = SparkExecutor.executeVertexProgramIteration((JavaPairRDD<Object, VertexWritable>)graphRDD, viewIncomingRDD, memory, (Configuration)vertexProgramConfiguration);
                    memory.setInTask(false);
                    if (this.vertexProgram.terminate((Memory)memory)) break;
                    memory.incrIteration();
                    memory.broadcastMemory(sparkContext);
                }
                if (!(configuration.get("gremlin.hadoop.graphOutputFormat", null) == null && configuration.get("gremlin.spark.graphOutputRDD", null) == null || this.persist.equals((Object)GraphComputer.Persist.NOTHING))) {
                    try {
                        ((OutputRDD)configuration.getClass("gremlin.spark.graphOutputRDD", OutputFormatRDD.class, OutputRDD.class).newInstance()).writeGraphRDD(configuration2, (JavaPairRDD<Object, VertexWritable>)graphRDD);
                    }
                    catch (IllegalAccessException | InstantiationException e) {
                        throw new IllegalStateException(e.getMessage(), e);
                    }
                }
            }
            MapMemory mapMemory = finalMemory = null == memory ? new MapMemory() : new MapMemory(memory);
            if (!this.mapReducers.isEmpty()) {
                String[] elementComputeKeys = this.vertexProgram == null ? new String[]{} : this.vertexProgram.getElementComputeKeys().toArray(new String[this.vertexProgram.getElementComputeKeys().size()]);
                JavaPairRDD mapReduceGraphRDD = SparkExecutor.prepareGraphRDDForMapReduce((JavaPairRDD<Object, VertexWritable>)graphRDD, viewIncomingRDD, elementComputeKeys).setName("mapReduceGraphRDD").cache();
                for (MapReduce mapReduce : this.mapReducers) {
                    HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(configuration2);
                    mapReduce.storeState((Configuration)newApacheConfiguration);
                    JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD<Object, VertexWritable>)mapReduceGraphRDD, mapReduce, (Configuration)newApacheConfiguration).setName("mapRDD");
                    JavaPairRDD reduceRDD = mapReduce.doStage(MapReduce.Stage.REDUCE) ? SparkExecutor.executeReduce(mapRDD, mapReduce, (Configuration)newApacheConfiguration).setName("reduceRDD") : null;
                    SparkExecutor.saveMapReduceRDD((JavaPairRDD<Object, Object>)(null == reduceRDD ? mapRDD : reduceRDD), mapReduce, (Memory.Admin)finalMemory, configuration);
                }
                mapReduceGraphRDD.unpersist();
            }
            if (configuration.get("gremlin.spark.graphOutputRDD", null) == null || this.persist.equals((Object)GraphComputer.Persist.NOTHING)) {
                graphRDD.unpersist();
            }
            finalMemory.setRuntime(System.currentTimeMillis() - startTime);
            DefaultComputerResult defaultComputerResult = new DefaultComputerResult((Graph)InputOutputHelper.getOutputGraph(configuration2, this.resultGraph, this.persist), finalMemory.asImmutable());
            return defaultComputerResult;
        }
        finally {
            if (sparkContext != null && !configuration2.getBoolean("gremlin.spark.persistContext", false)) {
                sparkContext.stop();
            }
        }
    }
}

