/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.watermark;

import java.util.BitSet;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.flink.api.common.watermark.LongWatermark;
import org.apache.flink.api.common.watermark.Watermark;
import org.apache.flink.api.common.watermark.WatermarkCombinationFunction;
import org.apache.flink.api.common.watermark.WatermarkCombinationPolicy;
import org.apache.flink.streaming.runtime.watermark.WatermarkCombiner;
import org.apache.flink.streaming.runtime.watermarkstatus.HeapPriorityQueue;
import org.apache.flink.util.Preconditions;

public class LongWatermarkCombiner
implements WatermarkCombiner {
    private final WatermarkCombinationPolicy combinationPolicy;
    private final int numberOfInputChannels;
    private final BitSet hasReceiveWatermarks;
    private final Map<Integer, LongWatermarkElement> channelWatermarks;
    private final HeapPriorityQueue<LongWatermarkElement> orderedChannelWatermarks;
    private final HeapPriorityQueue.PriorityComparator<LongWatermarkElement> watermarkComparator;
    private final LongWatermarkElement previousEmitWatermarkElement = new LongWatermarkElement(-1L);
    private boolean isFirstTimeEmit = true;

    public LongWatermarkCombiner(WatermarkCombinationPolicy combinationPolicy, int numberOfInputChannels) {
        long initValue;
        Preconditions.checkState((boolean)(combinationPolicy.getWatermarkCombinationFunction() instanceof WatermarkCombinationFunction.NumericWatermarkCombinationFunction));
        this.combinationPolicy = combinationPolicy;
        this.numberOfInputChannels = numberOfInputChannels;
        this.hasReceiveWatermarks = new BitSet(numberOfInputChannels);
        this.channelWatermarks = new HashMap<Integer, LongWatermarkElement>(numberOfInputChannels);
        if (combinationPolicy.getWatermarkCombinationFunction() == WatermarkCombinationFunction.NumericWatermarkCombinationFunction.MIN) {
            this.watermarkComparator = (left, right) -> Long.compare(left.getWatermarkValue(), right.getWatermarkValue());
            initValue = Long.MAX_VALUE;
        } else {
            this.watermarkComparator = (left, right) -> Long.compare(right.getWatermarkValue(), left.getWatermarkValue());
            initValue = Long.MIN_VALUE;
        }
        this.orderedChannelWatermarks = new HeapPriorityQueue<LongWatermarkElement>(this.watermarkComparator, numberOfInputChannels);
        for (int i = 0; i < numberOfInputChannels; ++i) {
            LongWatermarkElement watermarkElement = new LongWatermarkElement(initValue);
            this.channelWatermarks.put(i, watermarkElement);
            this.orderedChannelWatermarks.add(watermarkElement);
        }
    }

    @Override
    public void combineWatermark(Watermark watermark, int channelIndex, Consumer<Watermark> watermarkEmitter) {
        Preconditions.checkState((boolean)(watermark instanceof LongWatermark));
        this.hasReceiveWatermarks.set(channelIndex);
        this.channelWatermarks.get(channelIndex).setWatermarkValue(((LongWatermark)watermark).getValue());
        this.orderedChannelWatermarks.adjustModifiedElement(this.channelWatermarks.get(channelIndex));
        if (this.combinationPolicy.isCombineWaitForAllChannels() && this.hasReceiveWatermarks.cardinality() < this.numberOfInputChannels) {
            return;
        }
        if (this.shouldEmitWatermark(this.orderedChannelWatermarks.peek())) {
            watermarkEmitter.accept((Watermark)new LongWatermark(this.orderedChannelWatermarks.peek().getWatermarkValue(), watermark.getIdentifier()));
            this.previousEmitWatermarkElement.setWatermarkValue(this.orderedChannelWatermarks.peek().getWatermarkValue());
        }
    }

    private boolean shouldEmitWatermark(LongWatermarkElement combinedWatermarkElement) {
        if (this.isFirstTimeEmit) {
            this.isFirstTimeEmit = false;
            return true;
        }
        return this.watermarkComparator.comparePriority(combinedWatermarkElement, this.previousEmitWatermarkElement) != 0;
    }

    protected static class LongWatermarkElement
    implements HeapPriorityQueue.HeapPriorityQueueElement {
        private long watermarkValue;
        private int heapIndex = Integer.MIN_VALUE;

        public LongWatermarkElement(long watermarkValue) {
            this.watermarkValue = watermarkValue;
        }

        @Override
        public int getInternalIndex() {
            return this.heapIndex;
        }

        @Override
        public void setInternalIndex(int newIndex) {
            this.heapIndex = newIndex;
        }

        public void setWatermarkValue(long watermarkValue) {
            this.watermarkValue = watermarkValue;
        }

        public long getWatermarkValue() {
            return this.watermarkValue;
        }
    }
}

