/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.processor;

import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.aggregate.AggregateOperation;
import com.hazelcast.jet.aggregate.AggregateOperation1;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.function.DistributedFunction;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import javax.annotation.Nonnull;

public class CoGroupP<K, A, R>
extends AbstractProcessor {
    private final List<DistributedFunction<?, ? extends K>> groupKeyFs;
    private final AggregateOperation<A, R> aggrOp;
    private final Map<K, A> keyToAcc = new HashMap<K, A>();
    private final Traverser<Map.Entry<K, R>> resultTraverser;

    public CoGroupP(@Nonnull List<DistributedFunction<?, ? extends K>> groupKeyFs, @Nonnull AggregateOperation<A, R> aggrOp) {
        this.groupKeyFs = groupKeyFs;
        this.aggrOp = aggrOp;
        this.resultTraverser = Traversers.traverseStream(this.keyToAcc.entrySet().stream().map(e -> Util.entry(e.getKey(), this.aggrOp.finishFn().apply(e.getValue()))));
    }

    public <T> CoGroupP(@Nonnull DistributedFunction<? super T, ? extends K> groupKeyFn, @Nonnull AggregateOperation1<? super T, A, R> aggrOp) {
        this(Collections.singletonList(groupKeyFn), aggrOp);
    }

    @Override
    protected boolean tryProcess(int ordinal, @Nonnull Object item) {
        Function keyFn = this.groupKeyFs.get(ordinal);
        Object key = keyFn.apply(item);
        Object acc = this.keyToAcc.computeIfAbsent(key, k -> this.aggrOp.createFn().get());
        this.aggrOp.accumulateFn(ordinal).accept(acc, item);
        return true;
    }

    @Override
    public boolean complete() {
        return this.emitFromTraverser(this.resultTraverser);
    }
}

