/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.stream.impl.reducers;

import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.Partitioner;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.function.DistributedBinaryOperator;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.function.DistributedFunctions;
import com.hazelcast.jet.stream.impl.StreamUtil;
import com.hazelcast.jet.stream.impl.pipeline.Pipe;
import com.hazelcast.jet.stream.impl.pipeline.StreamContext;
import com.hazelcast.jet.stream.impl.processor.MergeP;
import com.hazelcast.jet.stream.impl.reducers.SinkReducer;

public class MergingSinkReducer<T, K, V, R>
extends SinkReducer<T, K, V, R> {
    private final DistributedBinaryOperator<V> mergeFunction;

    public MergingSinkReducer(String sinkName, DistributedFunction<JetInstance, ? extends R> toDistributedObject, DistributedFunction<? super T, ? extends K> keyMapper, DistributedFunction<? super T, ? extends V> valueMapper, DistributedBinaryOperator<V> mergeFunction, ProcessorMetaSupplier metaSupplier) {
        super(sinkName, toDistributedObject, keyMapper, valueMapper, metaSupplier);
        this.mergeFunction = mergeFunction;
    }

    @Override
    public R reduce(StreamContext context, Pipe<? extends T> upstream) {
        DAG dag = new DAG();
        Vertex previous = upstream.buildDAG(dag);
        Vertex merge = dag.newVertex("merge-local", () -> new MergeP(this.keyMapper, this.valueMapper, this.mergeFunction));
        Vertex combine = dag.newVertex("merge-distributed", () -> new MergeP(null, null, this.mergeFunction));
        Vertex writer = dag.newVertex(this.sinkName, this.metaSupplier);
        dag.edge(Edge.between(previous, merge).partitioned(this.keyMapper::apply, Partitioner.HASH_CODE)).edge(Edge.between(merge, combine).distributed().partitioned(DistributedFunctions.entryKey())).edge(Edge.between(combine, writer));
        StreamUtil.executeJob(context, dag);
        return this.toDistributedObject.apply(context.getJetInstance());
    }
}

