package org.apache.flink.streaming.runtime.watermarkstatus;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.watermarkstatus.HeapPriorityQueue;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValve.class */
public class StatusWatermarkValve {
    private final InputChannelStatus[] channelStatuses;
    private long lastOutputWatermark;
    private WatermarkStatus lastOutputWatermarkStatus;
    private final HeapPriorityQueue<InputChannelStatus> alignedChannelStatuses;

    /* JADX INFO: Access modifiers changed from: protected */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValve$InputChannelStatus.class */
    public static class InputChannelStatus implements HeapPriorityQueue.HeapPriorityQueueElement {
        protected long watermark;
        protected WatermarkStatus watermarkStatus;
        protected boolean isWatermarkAligned;
        private int heapIndex = Integer.MIN_VALUE;

        protected InputChannelStatus() {
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static boolean hasActiveChannels(InputChannelStatus[] inputChannelStatusArr) {
            for (InputChannelStatus inputChannelStatus : inputChannelStatusArr) {
                if (inputChannelStatus.watermarkStatus.isActive()) {
                    return true;
                }
            }
            return false;
        }

        @Override // org.apache.flink.streaming.runtime.watermarkstatus.HeapPriorityQueue.HeapPriorityQueueElement
        public int getInternalIndex() {
            return this.heapIndex;
        }

        @Override // org.apache.flink.streaming.runtime.watermarkstatus.HeapPriorityQueue.HeapPriorityQueueElement
        public void setInternalIndex(int i) {
            this.heapIndex = i;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void removeFrom(HeapPriorityQueue<InputChannelStatus> heapPriorityQueue) {
            heapPriorityQueue.remove(this);
            setInternalIndex(Integer.MIN_VALUE);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void addTo(HeapPriorityQueue<InputChannelStatus> heapPriorityQueue) {
            if (this.heapIndex == Integer.MIN_VALUE) {
                heapPriorityQueue.add(this);
            }
        }
    }

    public StatusWatermarkValve(int i) {
        Preconditions.checkArgument(i > 0);
        this.channelStatuses = new InputChannelStatus[i];
        this.alignedChannelStatuses = new HeapPriorityQueue<>((inputChannelStatus, inputChannelStatus2) -> {
            return Long.compare(inputChannelStatus.watermark, inputChannelStatus2.watermark);
        }, i);
        for (int i2 = 0; i2 < i; i2++) {
            this.channelStatuses[i2] = new InputChannelStatus();
            this.channelStatuses[i2].watermark = Long.MIN_VALUE;
            this.channelStatuses[i2].watermarkStatus = WatermarkStatus.ACTIVE;
            markWatermarkAligned(this.channelStatuses[i2]);
        }
        this.lastOutputWatermark = Long.MIN_VALUE;
        this.lastOutputWatermarkStatus = WatermarkStatus.ACTIVE;
    }

    public void inputWatermark(Watermark watermark, int i, PushingAsyncDataInput.DataOutput<?> dataOutput) throws Exception {
        if (this.lastOutputWatermarkStatus.isActive() && this.channelStatuses[i].watermarkStatus.isActive()) {
            long timestamp = watermark.getTimestamp();
            if (timestamp > this.channelStatuses[i].watermark) {
                this.channelStatuses[i].watermark = timestamp;
                if (this.channelStatuses[i].isWatermarkAligned) {
                    adjustAlignedChannelStatuses(this.channelStatuses[i]);
                } else if (timestamp >= this.lastOutputWatermark) {
                    markWatermarkAligned(this.channelStatuses[i]);
                }
                findAndOutputNewMinWatermarkAcrossAlignedChannels(dataOutput);
            }
        }
    }

    public void inputWatermarkStatus(WatermarkStatus watermarkStatus, int i, PushingAsyncDataInput.DataOutput<?> dataOutput) throws Exception {
        if (!watermarkStatus.isIdle() || !this.channelStatuses[i].watermarkStatus.isActive()) {
            if (watermarkStatus.isActive() && this.channelStatuses[i].watermarkStatus.isIdle()) {
                this.channelStatuses[i].watermarkStatus = WatermarkStatus.ACTIVE;
                if (this.channelStatuses[i].watermark >= this.lastOutputWatermark) {
                    markWatermarkAligned(this.channelStatuses[i]);
                }
                if (this.lastOutputWatermarkStatus.isIdle()) {
                    this.lastOutputWatermarkStatus = WatermarkStatus.ACTIVE;
                    dataOutput.emitWatermarkStatus(this.lastOutputWatermarkStatus);
                    return;
                }
                return;
            }
            return;
        }
        this.channelStatuses[i].watermarkStatus = WatermarkStatus.IDLE;
        markWatermarkUnaligned(this.channelStatuses[i]);
        if (InputChannelStatus.hasActiveChannels(this.channelStatuses)) {
            if (this.channelStatuses[i].watermark == this.lastOutputWatermark) {
                findAndOutputNewMinWatermarkAcrossAlignedChannels(dataOutput);
            }
        } else {
            if (this.channelStatuses[i].watermark == this.lastOutputWatermark) {
                findAndOutputMaxWatermarkAcrossAllChannels(dataOutput);
            }
            this.lastOutputWatermarkStatus = WatermarkStatus.IDLE;
            dataOutput.emitWatermarkStatus(this.lastOutputWatermarkStatus);
        }
    }

    private void findAndOutputNewMinWatermarkAcrossAlignedChannels(PushingAsyncDataInput.DataOutput<?> dataOutput) throws Exception {
        if (!(!this.alignedChannelStatuses.isEmpty()) || this.alignedChannelStatuses.peek().watermark <= this.lastOutputWatermark) {
            return;
        }
        this.lastOutputWatermark = this.alignedChannelStatuses.peek().watermark;
        dataOutput.emitWatermark(new Watermark(this.lastOutputWatermark));
    }

    private void markWatermarkAligned(InputChannelStatus inputChannelStatus) {
        inputChannelStatus.isWatermarkAligned = true;
        inputChannelStatus.addTo(this.alignedChannelStatuses);
    }

    private void markWatermarkUnaligned(InputChannelStatus inputChannelStatus) {
        inputChannelStatus.isWatermarkAligned = false;
        inputChannelStatus.removeFrom(this.alignedChannelStatuses);
    }

    private void adjustAlignedChannelStatuses(InputChannelStatus inputChannelStatus) {
        this.alignedChannelStatuses.adjustModifiedElement(inputChannelStatus);
    }

    private void findAndOutputMaxWatermarkAcrossAllChannels(PushingAsyncDataInput.DataOutput<?> dataOutput) throws Exception {
        long j = Long.MIN_VALUE;
        for (InputChannelStatus inputChannelStatus : this.channelStatuses) {
            j = Math.max(inputChannelStatus.watermark, j);
        }
        if (j > this.lastOutputWatermark) {
            this.lastOutputWatermark = j;
            dataOutput.emitWatermark(new Watermark(this.lastOutputWatermark));
        }
    }

    @VisibleForTesting
    protected InputChannelStatus getInputChannelStatus(int i) {
        Preconditions.checkArgument(i >= 0 && i < this.channelStatuses.length, "Invalid channel index. Number of input channels: " + this.channelStatuses.length);
        return this.channelStatuses[i];
    }
}
