/*
 * Decompiled with CFR 0.152.
 */
package org.apache.edgent.oplet.plumbing;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import org.apache.edgent.function.BiFunction;
import org.apache.edgent.oplet.OpletContext;
import org.apache.edgent.oplet.core.FanIn;

public class Barrier<T>
extends FanIn<T, List<T>> {
    private final int queueCapacity;
    private Thread thread;
    private List<LinkedBlockingQueue<T>> iportQueues;

    public Barrier(int queueCapacity) {
        this.queueCapacity = queueCapacity;
    }

    @Override
    public void initialize(OpletContext<T, List<T>> context) {
        super.initialize(context);
        this.thread = context.getService(ThreadFactory.class).newThread(() -> this.run());
        int numIports = this.getOpletContext().getInputCount();
        this.iportQueues = new ArrayList<LinkedBlockingQueue<T>>(numIports);
        for (int i = 0; i < numIports; ++i) {
            this.iportQueues.add(new LinkedBlockingQueue(this.queueCapacity));
        }
        this.setReceiver(this.receiver());
    }

    @Override
    public void start() {
        this.thread.start();
    }

    protected BiFunction<T, Integer, List<T>> receiver() {
        return (BiFunction & Serializable)(tuple, iportIndex) -> {
            this.accept((Object)tuple, (int)iportIndex);
            return null;
        };
    }

    protected void accept(T tuple, int iportIndex) {
        try {
            this.iportQueues.get(iportIndex).put(tuple);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

    private void run() {
        while (!Thread.interrupted()) {
            try {
                ArrayList<T> list = new ArrayList<T>(this.iportQueues.size());
                for (LinkedBlockingQueue<T> iport : this.iportQueues) {
                    list.add(iport.take());
                }
                this.submit(list);
            }
            catch (InterruptedException e) {
                break;
            }
        }
    }

    @Override
    public void close() {
    }
}

