/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tinkerpop.gremlin.spark.process.computer;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
import org.apache.tinkerpop.gremlin.process.computer.Messenger;
import org.apache.tinkerpop.gremlin.process.traversal.Step;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.StartStep;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import scala.Tuple2;

public final class SparkMessenger<M>
implements Messenger<M> {
    private Vertex vertex;
    private Iterable<M> incomingMessages;
    private List<Tuple2<Object, M>> outgoingMessages = new ArrayList<Tuple2<Object, M>>();

    public void setVertexAndIncomingMessages(Vertex vertex, Iterable<M> incomingMessages) {
        this.vertex = vertex;
        this.incomingMessages = incomingMessages;
        this.outgoingMessages = new ArrayList<Tuple2<Object, M>>();
    }

    public List<Tuple2<Object, M>> getOutgoingMessages() {
        return this.outgoingMessages;
    }

    public Iterator<M> receiveMessages() {
        return IteratorUtils.removeOnNext(this.incomingMessages.iterator());
    }

    public void sendMessage(MessageScope messageScope, M message) {
        if (messageScope instanceof MessageScope.Local) {
            MessageScope.Local localMessageScope = (MessageScope.Local)messageScope;
            Object incidentTraversal = SparkMessenger.setVertexStart((Traversal<Vertex, Edge>)((Traversal)localMessageScope.getIncidentTraversal().get()), this.vertex);
            Direction direction = SparkMessenger.getOppositeDirection(incidentTraversal);
            incidentTraversal.forEachRemaining(edge -> this.outgoingMessages.add(new Tuple2(((Vertex)edge.vertices(direction).next()).id(), message)));
        } else {
            ((MessageScope.Global)messageScope).vertices().forEach(v -> this.outgoingMessages.add(new Tuple2(v.id(), message)));
        }
    }

    private static <T extends Traversal.Admin<Vertex, Edge>> T setVertexStart(Traversal<Vertex, Edge> incidentTraversal, Vertex vertex) {
        incidentTraversal.asAdmin().addStep(0, (Step)new StartStep(incidentTraversal.asAdmin(), (Object)vertex));
        return (T)((Traversal.Admin)incidentTraversal);
    }

    private static Direction getOppositeDirection(Traversal.Admin<Vertex, Edge> incidentTraversal) {
        VertexStep step = (VertexStep)TraversalHelper.getLastStepOfAssignableClass(VertexStep.class, incidentTraversal).get();
        return step.getDirection().opposite();
    }
}

