/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.pipeline.transform;

import com.hazelcast.function.ComparatorEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.core.processor.Processors;
import com.hazelcast.jet.impl.pipeline.Planner;
import com.hazelcast.jet.impl.pipeline.transform.AbstractTransform;
import com.hazelcast.jet.impl.pipeline.transform.Transform;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class SortTransform<T>
extends AbstractTransform {
    private static final String COLLECT_STAGE_SUFFIX = "-collect";
    private final ComparatorEx<? super T> comparator;

    public SortTransform(@Nonnull Transform upstream, @Nullable ComparatorEx<? super T> comparator) {
        super("sort", upstream);
        this.comparator = comparator == null ? ComparatorEx.naturalOrder() : comparator;
    }

    @Override
    public void addToDag(Planner p) {
        String vertexName = this.name();
        Vertex v1 = p.dag.newVertex(vertexName, Processors.sortP(this.comparator)).localParallelism(this.localParallelism());
        Planner.PlannerVertex pv2 = p.addVertex((Transform)this, vertexName + COLLECT_STAGE_SUFFIX, 1, ProcessorMetaSupplier.forceTotalParallelismOne(ProcessorSupplier.of(Processors.mapP(FunctionEx.identity())), vertexName));
        p.addEdges(this, v1);
        p.dag.edge(Edge.between(v1, pv2.v).distributed().allToOne(vertexName).ordered(this.comparator));
    }
}

