/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.stream.impl.reducers;

import com.hazelcast.jet.DAG;
import com.hazelcast.jet.Edge;
import com.hazelcast.jet.KeyExtractors;
import com.hazelcast.jet.Partitioner;
import com.hazelcast.jet.Processors;
import com.hazelcast.jet.Vertex;
import com.hazelcast.jet.stream.IStreamMap;
import com.hazelcast.jet.stream.impl.StreamUtil;
import com.hazelcast.jet.stream.impl.pipeline.Pipeline;
import com.hazelcast.jet.stream.impl.pipeline.StreamContext;
import com.hazelcast.jet.stream.impl.processor.MergeP;
import com.hazelcast.jet.stream.impl.reducers.IMapReducer;
import java.util.function.BinaryOperator;
import java.util.function.Function;

public class MergingIMapReducer<T, K, V>
extends IMapReducer<T, K, V> {
    private final BinaryOperator<V> mergeFunction;

    public MergingIMapReducer(Function<? super T, ? extends K> keyMapper, Function<? super T, ? extends V> valueMapper, BinaryOperator<V> mergeFunction) {
        this(StreamUtil.uniqueMapName(), keyMapper, valueMapper, mergeFunction);
    }

    private MergingIMapReducer(String mapName, Function<? super T, ? extends K> keyMapper, Function<? super T, ? extends V> valueMapper, BinaryOperator<V> mergeFunction) {
        super(mapName, keyMapper, valueMapper);
        this.mergeFunction = mergeFunction;
    }

    @Override
    public IStreamMap<K, V> reduce(StreamContext context, Pipeline<? extends T> upstream) {
        Object target = this.getTarget(context.getJetInstance());
        DAG dag = new DAG();
        Vertex previous = upstream.buildDAG(dag);
        Vertex merge = dag.newVertex("merge-local", () -> new MergeP(this.keyMapper, this.valueMapper, this.mergeFunction));
        Vertex combine = dag.newVertex("merge-distributed", () -> new MergeP(null, null, this.mergeFunction));
        Vertex writer = dag.newVertex("write-map-" + this.mapName, Processors.writeMap(this.mapName));
        dag.edge(Edge.between(previous, merge).partitioned(this.keyMapper::apply, Partitioner.HASH_CODE)).edge(Edge.between(merge, combine).distributed().partitioned(KeyExtractors.entryKey())).edge(Edge.between(combine, writer));
        StreamUtil.executeJob(context, dag);
        return target;
    }
}

