/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tinkerpop.gremlin.spark.process.computer;

import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BinaryOperator;
import org.apache.spark.Accumulator;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.MemoryHelper;
import org.apache.tinkerpop.gremlin.process.traversal.Operator;
import org.apache.tinkerpop.gremlin.spark.process.computer.MemoryAccumulator;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;

public final class SparkMemory
implements Memory.Admin,
Serializable {
    public final Map<String, MemoryComputeKey> memoryComputeKeys = new HashMap<String, MemoryComputeKey>();
    private final Map<String, Accumulator<ObjectWritable>> sparkMemory = new HashMap<String, Accumulator<ObjectWritable>>();
    private final AtomicInteger iteration = new AtomicInteger(0);
    private final AtomicLong runtime = new AtomicLong(0L);
    private Broadcast<Map<String, Object>> broadcast;
    private boolean inExecute = false;

    public SparkMemory(VertexProgram<?> vertexProgram, Set<MapReduce> mapReducers, JavaSparkContext sparkContext) {
        if (null != vertexProgram) {
            for (MemoryComputeKey key : vertexProgram.getMemoryComputeKeys()) {
                this.memoryComputeKeys.put(key.getKey(), key);
            }
        }
        for (MapReduce mapReduce : mapReducers) {
            this.memoryComputeKeys.put(mapReduce.getMemoryKey(), MemoryComputeKey.of((String)mapReduce.getMemoryKey(), (BinaryOperator)Operator.assign, (boolean)false, (boolean)false));
        }
        for (MemoryComputeKey memoryComputeKey : this.memoryComputeKeys.values()) {
            this.sparkMemory.put(memoryComputeKey.getKey(), (Accumulator<ObjectWritable>)sparkContext.accumulator((Object)ObjectWritable.empty(), memoryComputeKey.getKey(), new MemoryAccumulator(memoryComputeKey)));
        }
        this.broadcast = sparkContext.broadcast(Collections.emptyMap());
    }

    public Set<String> keys() {
        if (this.inExecute) {
            return ((Map)this.broadcast.getValue()).keySet();
        }
        HashSet trueKeys = new HashSet();
        this.sparkMemory.forEach((key, value) -> {
            if (!((ObjectWritable)value.value()).isEmpty()) {
                trueKeys.add(key);
            }
        });
        return Collections.unmodifiableSet(trueKeys);
    }

    public void incrIteration() {
        this.iteration.getAndIncrement();
    }

    public void setIteration(int iteration) {
        this.iteration.set(iteration);
    }

    public int getIteration() {
        return this.iteration.get();
    }

    public void setRuntime(long runTime) {
        this.runtime.set(runTime);
    }

    public long getRuntime() {
        return this.runtime.get();
    }

    public <R> R get(String key) throws IllegalArgumentException {
        if (!this.memoryComputeKeys.containsKey(key)) {
            throw Memory.Exceptions.memoryDoesNotExist((String)key);
        }
        if (this.inExecute && !this.memoryComputeKeys.get(key).isBroadcast()) {
            throw Memory.Exceptions.memoryDoesNotExist((String)key);
        }
        ObjectWritable r = (ObjectWritable)(this.inExecute ? ((Map)this.broadcast.value()).get(key) : this.sparkMemory.get(key).value());
        if (null == r || r.isEmpty()) {
            throw Memory.Exceptions.memoryDoesNotExist((String)key);
        }
        return (R)r.get();
    }

    public void add(String key, Object value) {
        this.checkKeyValue(key, value);
        if (!this.inExecute) {
            throw Memory.Exceptions.memoryAddOnlyDuringVertexProgramExecute((String)key);
        }
        this.sparkMemory.get(key).add((Object)new ObjectWritable(value));
    }

    public void set(String key, Object value) {
        this.checkKeyValue(key, value);
        if (this.inExecute) {
            throw Memory.Exceptions.memorySetOnlyDuringVertexProgramSetUpAndTerminate((String)key);
        }
        this.sparkMemory.get(key).setValue((Object)new ObjectWritable(value));
    }

    public String toString() {
        return StringFactory.memoryString((Memory)this);
    }

    protected void complete() {
        this.memoryComputeKeys.values().stream().filter(MemoryComputeKey::isTransient).forEach(memoryComputeKey -> this.sparkMemory.remove(memoryComputeKey.getKey()));
    }

    public void setInExecute(boolean inExecute) {
        this.inExecute = inExecute;
    }

    protected void broadcastMemory(JavaSparkContext sparkContext) {
        this.broadcast.destroy(true);
        HashMap toBroadcast = new HashMap();
        this.sparkMemory.forEach((key, object) -> {
            if (!((ObjectWritable)object.value()).isEmpty() && this.memoryComputeKeys.get(key).isBroadcast()) {
                toBroadcast.put(key, object.value());
            }
        });
        this.broadcast = sparkContext.broadcast(toBroadcast);
    }

    private void checkKeyValue(String key, Object value) {
        if (!this.memoryComputeKeys.containsKey(key)) {
            throw GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey((String)key);
        }
        MemoryHelper.validateValue((Object)value);
    }
}

