/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.watermark;

import java.util.BitSet;
import java.util.function.Consumer;
import org.apache.flink.api.common.watermark.Watermark;
import org.apache.flink.streaming.runtime.watermark.WatermarkCombiner;

public class AlignedWatermarkCombiner
implements WatermarkCombiner {
    private int numberOfInputChannels;
    private final BitSet hasReceiveWatermarks;
    private Runnable gateResumer;

    public AlignedWatermarkCombiner(int numberOfInputChannels, Runnable gateResumer) {
        this.numberOfInputChannels = numberOfInputChannels;
        this.hasReceiveWatermarks = new BitSet(numberOfInputChannels);
        this.gateResumer = gateResumer;
    }

    @Override
    public void combineWatermark(Watermark watermark, int channelIndex, Consumer<Watermark> watermarkEmitter) throws Exception {
        this.hasReceiveWatermarks.set(channelIndex);
        if (this.hasReceiveWatermarks.cardinality() == this.numberOfInputChannels) {
            watermarkEmitter.accept(watermark);
            this.hasReceiveWatermarks.clear();
            this.gateResumer.run();
        }
    }
}

