/*
 * Decompiled with CFR 0.152.
 */
package org.apache.edgent.oplet.window;

import java.io.Serializable;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.edgent.function.BiConsumer;
import org.apache.edgent.function.BiFunction;
import org.apache.edgent.function.Functions;
import org.apache.edgent.oplet.OpletContext;
import org.apache.edgent.oplet.core.Pipe;
import org.apache.edgent.window.Window;

public class Aggregate<T, U, K>
extends Pipe<T, U> {
    private static final long serialVersionUID = 1L;
    private final Window<T, K, ? extends List<T>> window;
    private final BiFunction<List<T>, K, U> aggregator;

    public Aggregate(Window<T, K, ? extends List<T>> window, BiFunction<List<T>, K, U> aggregator) {
        this.aggregator = aggregator;
        BiConsumer & Serializable partProcessor = (BiConsumer & Serializable)(tuples, key) -> {
            Object aggregateTuple = aggregator.apply(tuples, key);
            if (aggregateTuple != null) {
                this.submit(aggregateTuple);
            }
        };
        window.registerPartitionProcessor((BiConsumer)partProcessor);
        this.window = window;
    }

    @Override
    public void initialize(OpletContext<T, U> context) {
        super.initialize(context);
        this.window.registerScheduledExecutorService(this.getOpletContext().getService(ScheduledExecutorService.class));
    }

    public void accept(T tuple) {
        this.window.insert(tuple);
    }

    @Override
    public void close() throws Exception {
        Functions.closeFunction(this.aggregator);
    }
}

