/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.stream.impl.reducers;

import com.hazelcast.jet.DAG;
import com.hazelcast.jet.Edge;
import com.hazelcast.jet.Processor;
import com.hazelcast.jet.Vertex;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.function.DistributedSupplier;
import com.hazelcast.jet.processor.Sinks;
import com.hazelcast.jet.stream.DistributedCollector;
import com.hazelcast.jet.stream.IStreamList;
import com.hazelcast.jet.stream.impl.StreamUtil;
import com.hazelcast.jet.stream.impl.pipeline.Pipeline;
import com.hazelcast.jet.stream.impl.pipeline.StreamContext;
import com.hazelcast.jet.stream.impl.processor.AccumulateP;
import com.hazelcast.jet.stream.impl.processor.CombineP;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;

public final class Reducers {
    private Reducers() {
    }

    private static <T> Vertex buildCombiner(DAG dag, Vertex accumulate, BinaryOperator<T> combiner) {
        DistributedSupplier<Processor> supplier = () -> new CombineP(combiner, DistributedFunction.identity());
        Vertex combine = dag.newVertex("combine", supplier).localParallelism(1);
        dag.edge(Edge.between(accumulate, combine).distributed().allToOne());
        return combine;
    }

    private static <T> Optional<T> execute(StreamContext context, DAG dag, Vertex combiner) {
        String listName = StreamUtil.uniqueListName();
        Vertex writeList = dag.newVertex("write-" + listName, Sinks.writeList(listName));
        dag.edge(Edge.between(combiner, writeList));
        IStreamList list = context.getJetInstance().getList(listName);
        StreamUtil.executeJob(context, dag);
        if (list.isEmpty()) {
            list.destroy();
            return Optional.empty();
        }
        Object result = list.get(0);
        list.destroy();
        return Optional.of(result);
    }

    private static <T, U> Vertex buildMappingAccumulator(DAG dag, Pipeline<? extends T> upstream, U identity, BiFunction<U, ? super T, U> accumulator) {
        Vertex reduce = dag.newVertex("reduce", () -> new AccumulateP(accumulator, identity));
        Vertex previous = upstream.buildDAG(dag);
        if (previous != reduce) {
            dag.edge(Edge.between(previous, reduce));
        }
        return reduce;
    }

    private static <T> Vertex buildAccumulator(DAG dag, Pipeline<? extends T> upstream, BinaryOperator<T> accumulator, T identity) {
        Vertex reduce = Reducers.reduceVertex(accumulator, identity);
        dag.vertex(reduce);
        Vertex previous = upstream.buildDAG(dag);
        if (previous != reduce) {
            dag.edge(Edge.between(previous, reduce));
        }
        return reduce;
    }

    private static <T> Vertex reduceVertex(BinaryOperator<T> accumulator, T identity) {
        return identity != null ? new Vertex("reduce", () -> new AccumulateP(accumulator, identity)) : new Vertex("reduce", () -> new CombineP(accumulator, DistributedFunction.identity()));
    }

    public static class BinaryAccumulateWithIdentity<T>
    implements DistributedCollector.Reducer<T, T> {
        private final T identity;
        private final BinaryOperator<T> accumulator;

        public BinaryAccumulateWithIdentity(T identity, BinaryOperator<T> accumulator) {
            this.identity = identity;
            this.accumulator = accumulator;
        }

        @Override
        public T reduce(StreamContext context, Pipeline<? extends T> upstream) {
            DAG dag = new DAG();
            Vertex accumulate = Reducers.buildAccumulator(dag, upstream, this.accumulator, this.identity);
            Vertex combine = Reducers.buildCombiner(dag, accumulate, this.accumulator);
            return Reducers.execute(context, dag, combine).get();
        }
    }

    public static class BinaryAccumulate<T>
    implements DistributedCollector.Reducer<T, Optional<T>> {
        private final BinaryOperator<T> accumulator;

        public BinaryAccumulate(BinaryOperator<T> accumulator) {
            this.accumulator = accumulator;
        }

        @Override
        public Optional<T> reduce(StreamContext context, Pipeline<? extends T> upstream) {
            DAG dag = new DAG();
            Vertex accumulate = Reducers.buildAccumulator(dag, upstream, this.accumulator, null);
            Vertex combine = Reducers.buildCombiner(dag, accumulate, this.accumulator);
            return Reducers.execute(context, dag, combine);
        }
    }

    public static class AccumulateCombineWithIdentity<T, U>
    implements DistributedCollector.Reducer<T, U> {
        private final U identity;
        private final BiFunction<U, ? super T, U> accumulator;
        private final BinaryOperator<U> combiner;

        public AccumulateCombineWithIdentity(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) {
            this.identity = identity;
            this.accumulator = accumulator;
            this.combiner = combiner;
        }

        @Override
        public U reduce(StreamContext context, Pipeline<? extends T> upstream) {
            DAG dag = new DAG();
            Vertex accumulate = Reducers.buildMappingAccumulator(dag, upstream, this.identity, this.accumulator);
            Vertex combine = Reducers.buildCombiner(dag, accumulate, this.combiner);
            return (U)Reducers.execute(context, dag, combine).get();
        }
    }
}

