/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tinkerpop.gremlin.spark.process.computer;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.stream.Stream;
import org.apache.commons.configuration.ConfigurationUtils;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.storage.StorageLevel;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.ComputerSubmissionHelper;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorage;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkExecutor;
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkMemory;
import org.apache.tinkerpop.gremlin.spark.structure.Spark;
import org.apache.tinkerpop.gremlin.spark.structure.io.InputFormatRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.InputOutputHelper;
import org.apache.tinkerpop.gremlin.spark.structure.io.InputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.OutputFormatRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.OutputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedInputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorage;
import org.apache.tinkerpop.gremlin.structure.Graph;

public final class SparkGraphComputer
extends AbstractHadoopGraphComputer {
    private final org.apache.commons.configuration.Configuration sparkConfiguration = new HadoopConfiguration();
    private boolean workersSet = false;

    public SparkGraphComputer(HadoopGraph hadoopGraph) {
        super(hadoopGraph);
        ConfigurationUtils.copy((org.apache.commons.configuration.Configuration)this.hadoopGraph.configuration(), (org.apache.commons.configuration.Configuration)this.sparkConfiguration);
    }

    public GraphComputer workers(int workers) {
        super.workers(workers);
        if (this.sparkConfiguration.containsKey("spark.master") && this.sparkConfiguration.getString("spark.master").startsWith("local")) {
            this.sparkConfiguration.setProperty("spark.master", (Object)("local[" + this.workers + "]"));
        }
        this.workersSet = true;
        return this;
    }

    public GraphComputer configure(String key, Object value) {
        this.sparkConfiguration.setProperty(key, value);
        return this;
    }

    protected void validateStatePriorToExecution() {
        super.validateStatePriorToExecution();
        if (this.sparkConfiguration.containsKey("gremlin.spark.graphInputRDD") && this.sparkConfiguration.containsKey("gremlin.hadoop.graphInputFormat")) {
            this.logger.warn("Both gremlin.spark.graphInputRDD and gremlin.hadoop.graphInputFormat were specified, ignoring gremlin.hadoop.graphInputFormat");
        }
        if (this.sparkConfiguration.containsKey("gremlin.spark.graphOutputRDD") && this.sparkConfiguration.containsKey("gremlin.hadoop.graphOutputFormat")) {
            this.logger.warn("Both gremlin.spark.graphOutputRDD and gremlin.hadoop.graphOutputFormat were specified, ignoring gremlin.hadoop.graphOutputFormat");
        }
    }

    public Future<ComputerResult> submit() {
        this.validateStatePriorToExecution();
        return ComputerSubmissionHelper.runWithBackgroundThread(this::submitWithExecutor, (String)"SparkSubmitter");
    }

    private Future<ComputerResult> submitWithExecutor(Executor exec) {
        HadoopConfiguration apacheConfiguration = new HadoopConfiguration(this.sparkConfiguration);
        apacheConfiguration.setProperty("gremlin.hadoop.graphInputFormat.hasEdges", (Object)this.persist.equals((Object)GraphComputer.Persist.EDGES));
        Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration((org.apache.commons.configuration.Configuration)apacheConfiguration);
        if (hadoopConfiguration.get("gremlin.spark.graphInputRDD", null) == null && hadoopConfiguration.get("gremlin.hadoop.graphInputFormat", null) != null && FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass("gremlin.hadoop.graphInputFormat", InputFormat.class))) {
            try {
                String inputLocation = FileSystem.get((Configuration)hadoopConfiguration).getFileStatus(new Path(hadoopConfiguration.get("gremlin.hadoop.inputLocation"))).getPath().toString();
                apacheConfiguration.setProperty("mapreduce.input.fileinputformat.inputdir", (Object)inputLocation);
                hadoopConfiguration.set("mapreduce.input.fileinputformat.inputdir", inputLocation);
            }
            catch (IOException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
        }
        return CompletableFuture.supplyAsync(() -> this.lambda$submitWithExecutor$1(hadoopConfiguration, (org.apache.commons.configuration.Configuration)apacheConfiguration), exec);
    }

    private void loadJars(JavaSparkContext sparkContext, Configuration hadoopConfiguration) {
        if (hadoopConfiguration.getBoolean("gremlin.hadoop.jarsInDistributedCache", true)) {
            String hadoopGremlinLocalLibs;
            String string = hadoopGremlinLocalLibs = null == System.getProperty("HADOOP_GREMLIN_LIBS") ? System.getenv("HADOOP_GREMLIN_LIBS") : System.getProperty("HADOOP_GREMLIN_LIBS");
            if (null == hadoopGremlinLocalLibs) {
                this.logger.warn("HADOOP_GREMLIN_LIBS is not set -- proceeding regardless");
            } else {
                String[] paths;
                for (String path : paths = hadoopGremlinLocalLibs.split(":")) {
                    File file = new File(path);
                    if (file.exists()) {
                        Stream.of(file.listFiles()).filter(f -> f.getName().endsWith(".jar")).forEach(f -> sparkContext.addJar(f.getAbsolutePath()));
                        continue;
                    }
                    this.logger.warn(path + " does not reference a valid directory -- proceeding regardless");
                }
            }
        }
    }

    private void updateLocalConfiguration(JavaSparkContext sparkContext, SparkConf sparkConfiguration) {
        String[] validPropertyNames;
        for (String propertyName : validPropertyNames = new String[]{"spark.job.description", "spark.jobGroup.id", "spark.job.interruptOnCancel", "spark.scheduler.pool"}) {
            if (!sparkConfiguration.contains(propertyName)) continue;
            String propertyValue = sparkConfiguration.get(propertyName);
            this.logger.info("Setting Thread Local SparkContext Property - " + propertyName + " : " + propertyValue);
            sparkContext.setLocalProperty(propertyName, sparkConfiguration.get(propertyName));
        }
    }

    public static void main(String[] args) throws Exception {
        PropertiesConfiguration configuration = new PropertiesConfiguration(args[0]);
        new SparkGraphComputer(HadoopGraph.open((org.apache.commons.configuration.Configuration)configuration)).program(VertexProgram.createVertexProgram((Graph)HadoopGraph.open((org.apache.commons.configuration.Configuration)configuration), (org.apache.commons.configuration.Configuration)configuration)).submit().get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private /* synthetic */ ComputerResult lambda$submitWithExecutor$1(Configuration hadoopConfiguration, org.apache.commons.configuration.Configuration apacheConfiguration) {
        long startTime = System.currentTimeMillis();
        FileSystemStorage fileSystemStorage = FileSystemStorage.open((Configuration)hadoopConfiguration);
        SparkContextStorage sparkContextStorage = SparkContextStorage.open(apacheConfiguration);
        boolean inputFromSpark = PersistedInputRDD.class.isAssignableFrom(hadoopConfiguration.getClass("gremlin.spark.graphInputRDD", Object.class));
        boolean outputToHDFS = FileOutputFormat.class.isAssignableFrom(hadoopConfiguration.getClass("gremlin.hadoop.graphOutputFormat", Object.class));
        boolean outputToSpark = PersistedOutputRDD.class.isAssignableFrom(hadoopConfiguration.getClass("gremlin.spark.graphOutputRDD", Object.class));
        SparkMemory memory = null;
        String outputLocation = hadoopConfiguration.get("gremlin.hadoop.outputLocation", null);
        if (null != outputLocation) {
            if (outputToHDFS && fileSystemStorage.exists(outputLocation)) {
                fileSystemStorage.rm(outputLocation);
            }
            if (outputToSpark && sparkContextStorage.exists(outputLocation)) {
                sparkContextStorage.rm(outputLocation);
            }
        }
        SparkConf sparkConfiguration = new SparkConf();
        sparkConfiguration.setAppName("HadoopGremlin(Spark): " + (null == this.vertexProgram ? "No VertexProgram" : this.vertexProgram) + "[" + this.mapReducers + "]");
        hadoopConfiguration.forEach(entry -> sparkConfiguration.set((String)entry.getKey(), (String)entry.getValue()));
        try {
            MapMemory finalMemory;
            boolean computedGraphCreated;
            JavaPairRDD<Object, VertexWritable> loadedGraphRDD;
            JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate((SparkConf)sparkConfiguration));
            this.loadJars(sparkContext, hadoopConfiguration);
            Spark.create(sparkContext.sc());
            this.updateLocalConfiguration(sparkContext, sparkConfiguration);
            JavaPairRDD<Object, VertexWritable> computedGraphRDD = null;
            boolean partitioned = false;
            try {
                loadedGraphRDD = ((InputRDD)hadoopConfiguration.getClass("gremlin.spark.graphInputRDD", InputFormatRDD.class, InputRDD.class).newInstance()).readGraphRDD(apacheConfiguration, sparkContext);
                if (loadedGraphRDD.partitioner().isPresent()) {
                    this.logger.info("Using the existing partitioner associated with the loaded graphRDD: " + loadedGraphRDD.partitioner().get());
                } else {
                    loadedGraphRDD = loadedGraphRDD.partitionBy((Partitioner)new HashPartitioner(this.workersSet ? this.workers : loadedGraphRDD.partitions().size()));
                    partitioned = true;
                }
                assert (loadedGraphRDD.partitioner().isPresent());
                if (this.workersSet) {
                    if (loadedGraphRDD.partitions().size() > this.workers) {
                        loadedGraphRDD = loadedGraphRDD.coalesce(this.workers);
                    } else if (loadedGraphRDD.partitions().size() < this.workers) {
                        loadedGraphRDD = loadedGraphRDD.repartition(this.workers);
                    }
                }
                if (!inputFromSpark || partitioned) {
                    loadedGraphRDD = loadedGraphRDD.persist(StorageLevel.fromString((String)hadoopConfiguration.get("gremlin.spark.graphStorageLevel", "MEMORY_ONLY")));
                }
            }
            catch (IllegalAccessException | InstantiationException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            JavaPairRDD viewIncomingRDD = null;
            if (null != this.vertexProgram) {
                memory = new SparkMemory(this.vertexProgram, this.mapReducers, sparkContext);
                this.vertexProgram.setup((Memory)memory);
                memory.broadcastMemory(sparkContext);
                HadoopConfiguration vertexProgramConfiguration = new HadoopConfiguration();
                this.vertexProgram.storeState((org.apache.commons.configuration.Configuration)vertexProgramConfiguration);
                ConfigurationUtils.copy((org.apache.commons.configuration.Configuration)vertexProgramConfiguration, (org.apache.commons.configuration.Configuration)apacheConfiguration);
                ConfUtil.mergeApacheIntoHadoopConfiguration((org.apache.commons.configuration.Configuration)vertexProgramConfiguration, (Configuration)hadoopConfiguration);
                while (true) {
                    memory.setInTask(true);
                    viewIncomingRDD = SparkExecutor.executeVertexProgramIteration(loadedGraphRDD, viewIncomingRDD, memory, (org.apache.commons.configuration.Configuration)vertexProgramConfiguration);
                    memory.setInTask(false);
                    if (this.vertexProgram.terminate((Memory)memory)) break;
                    memory.incrIteration();
                    memory.broadcastMemory(sparkContext);
                }
                String[] elementComputeKeys = this.vertexProgram == null ? new String[]{} : this.vertexProgram.getElementComputeKeys().toArray(new String[this.vertexProgram.getElementComputeKeys().size()]);
                computedGraphRDD = SparkExecutor.prepareFinalGraphRDD(loadedGraphRDD, viewIncomingRDD, elementComputeKeys);
                if (!(hadoopConfiguration.get("gremlin.hadoop.graphOutputFormat", null) == null && hadoopConfiguration.get("gremlin.spark.graphOutputRDD", null) == null || this.persist.equals((Object)GraphComputer.Persist.NOTHING))) {
                    try {
                        ((OutputRDD)hadoopConfiguration.getClass("gremlin.spark.graphOutputRDD", OutputFormatRDD.class, OutputRDD.class).newInstance()).writeGraphRDD(apacheConfiguration, computedGraphRDD);
                    }
                    catch (IllegalAccessException | InstantiationException e) {
                        throw new IllegalStateException(e.getMessage(), e);
                    }
                }
            }
            boolean bl = computedGraphCreated = computedGraphRDD != null;
            if (!computedGraphCreated) {
                computedGraphRDD = loadedGraphRDD;
            }
            MapMemory mapMemory = finalMemory = null == memory ? new MapMemory() : new MapMemory((Memory)memory);
            if (!this.mapReducers.isEmpty()) {
                if (computedGraphCreated && !outputToSpark) {
                    computedGraphRDD = computedGraphRDD.mapValues((Function & Serializable)vertexWritable -> {
                        vertexWritable.get().dropEdges();
                        return vertexWritable;
                    });
                    if (this.mapReducers.size() > 1) {
                        computedGraphRDD = computedGraphRDD.persist(StorageLevel.fromString((String)hadoopConfiguration.get("gremlin.spark.graphStorageLevel", "MEMORY_ONLY")));
                    }
                }
                for (MapReduce mapReduce : this.mapReducers) {
                    HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
                    mapReduce.storeState((org.apache.commons.configuration.Configuration)newApacheConfiguration);
                    JavaPairRDD mapRDD = SparkExecutor.executeMap(computedGraphRDD, mapReduce, (org.apache.commons.configuration.Configuration)newApacheConfiguration);
                    JavaPairRDD combineRDD = mapReduce.doStage(MapReduce.Stage.COMBINE) ? SparkExecutor.executeCombine(mapRDD, (org.apache.commons.configuration.Configuration)newApacheConfiguration) : mapRDD;
                    JavaPairRDD reduceRDD = mapReduce.doStage(MapReduce.Stage.REDUCE) ? SparkExecutor.executeReduce(combineRDD, mapReduce, (org.apache.commons.configuration.Configuration)newApacheConfiguration) : combineRDD;
                    try {
                        mapReduce.addResultToMemory((Memory.Admin)finalMemory, ((OutputRDD)hadoopConfiguration.getClass("gremlin.spark.graphOutputRDD", OutputFormatRDD.class, OutputRDD.class).newInstance()).writeMemoryRDD(apacheConfiguration, mapReduce.getMemoryKey(), reduceRDD));
                    }
                    catch (IllegalAccessException | InstantiationException e) {
                        throw new IllegalStateException(e.getMessage(), e);
                    }
                }
            }
            if ((!inputFromSpark || partitioned) && computedGraphCreated) {
                loadedGraphRDD.unpersist();
            }
            if (!outputToSpark || this.persist.equals((Object)GraphComputer.Persist.NOTHING)) {
                computedGraphRDD.unpersist();
            }
            if (null != outputLocation && this.persist.equals((Object)GraphComputer.Persist.NOTHING)) {
                if (outputToHDFS) {
                    fileSystemStorage.rm(outputLocation);
                }
                if (outputToSpark) {
                    sparkContextStorage.rm(outputLocation);
                }
            }
            finalMemory.setRuntime(System.currentTimeMillis() - startTime);
            DefaultComputerResult defaultComputerResult = new DefaultComputerResult((Graph)InputOutputHelper.getOutputGraph(apacheConfiguration, this.resultGraph, this.persist), finalMemory.asImmutable());
            return defaultComputerResult;
        }
        finally {
            if (!apacheConfiguration.getBoolean("gremlin.spark.persistContext", false)) {
                Spark.close();
            }
        }
    }
}

