/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.pipeline.transform;

import com.hazelcast.jet.aggregate.AggregateOperation;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.core.processor.Processors;
import com.hazelcast.jet.impl.pipeline.Planner;
import com.hazelcast.jet.impl.pipeline.transform.AbstractTransform;
import com.hazelcast.jet.impl.pipeline.transform.Transform;
import java.util.List;
import javax.annotation.Nonnull;

public class AggregateTransform<A, R>
extends AbstractTransform {
    public static final String FIRST_STAGE_VERTEX_NAME_SUFFIX = "-prepare";
    @Nonnull
    private final AggregateOperation<A, ? extends R> aggrOp;

    public AggregateTransform(@Nonnull List<Transform> upstream, @Nonnull AggregateOperation<A, ? extends R> aggrOp) {
        super(AggregateTransform.createName(upstream), upstream);
        this.aggrOp = aggrOp;
    }

    private static String createName(@Nonnull List<Transform> upstream) {
        return upstream.size() == 1 ? "aggregate" : upstream.size() + "-way co-aggregate";
    }

    @Override
    public void addToDag(Planner p) {
        if (this.aggrOp.combineFn() == null) {
            this.addToDagSingleStage(p);
        } else {
            this.addToDagTwoStage(p);
        }
    }

    private void addToDagSingleStage(Planner p) {
        Planner.PlannerVertex pv = p.addVertex((Transform)this, this.name(), 1, Processors.aggregateP(this.aggrOp));
        p.addEdges((Transform)this, pv.v, edge -> edge.distributed().allToOne(this.name().hashCode()));
    }

    private void addToDagTwoStage(Planner p) {
        String vertexName = this.name();
        Vertex v1 = p.dag.newVertex(vertexName + FIRST_STAGE_VERTEX_NAME_SUFFIX, Processors.accumulateP(this.aggrOp)).localParallelism(this.localParallelism());
        Planner.PlannerVertex pv2 = p.addVertex((Transform)this, vertexName, 1, Processors.combineP(this.aggrOp));
        p.addEdges(this, v1);
        p.dag.edge(Edge.between(v1, pv2.v).distributed().allToOne(this.name().hashCode()));
    }
}

