package org.apache.flink.streaming.runtime.watermarkstatus;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValveTest.class */
class StatusWatermarkValveTest {

    /* loaded from: input_file:org/apache/flink/streaming/runtime/watermarkstatus/StatusWatermarkValveTest$StatusWatermarkOutput.class */
    private static class StatusWatermarkOutput implements PushingAsyncDataInput.DataOutput {
        private BlockingQueue<StreamElement> allOutputs;

        private StatusWatermarkOutput() {
            this.allOutputs = new LinkedBlockingQueue();
        }

        public void emitWatermark(Watermark watermark) {
            this.allOutputs.add(watermark);
        }

        public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {
            this.allOutputs.add(watermarkStatus);
        }

        public void emitRecord(StreamRecord streamRecord) {
            throw new UnsupportedOperationException();
        }

        public void emitLatencyMarker(LatencyMarker latencyMarker) {
            throw new UnsupportedOperationException();
        }

        public void emitRecordAttributes(RecordAttributes recordAttributes) throws Exception {
            throw new UnsupportedOperationException();
        }

        public StreamElement popLastSeenOutput() {
            return this.allOutputs.poll();
        }
    }

    StatusWatermarkValveTest() {
    }

    @Test
    void testSingleInputIncreasingWatermarks() throws Exception {
        StatusWatermarkOutput statusWatermarkOutput = new StatusWatermarkOutput();
        StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(1);
        statusWatermarkValve.inputWatermark(new Watermark(0L), 0, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isEqualTo(new Watermark(0L));
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
        statusWatermarkValve.inputWatermark(new Watermark(25L), 0, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isEqualTo(new Watermark(25L));
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
    }

    @Test
    void testSingleInputDecreasingWatermarksYieldsNoOutput() throws Exception {
        StatusWatermarkOutput statusWatermarkOutput = new StatusWatermarkOutput();
        StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(1);
        statusWatermarkValve.inputWatermark(new Watermark(25L), 0, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isEqualTo(new Watermark(25L));
        statusWatermarkValve.inputWatermark(new Watermark(18L), 0, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
        statusWatermarkValve.inputWatermark(new Watermark(42L), 0, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isEqualTo(new Watermark(42L));
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
    }

    @Test
    void testSingleInputWatermarkStatusToggling() throws Exception {
        StatusWatermarkOutput statusWatermarkOutput = new StatusWatermarkOutput();
        StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(1);
        statusWatermarkValve.inputWatermarkStatus(WatermarkStatus.ACTIVE, 0, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
        statusWatermarkValve.inputWatermarkStatus(WatermarkStatus.IDLE, 0, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isEqualTo(WatermarkStatus.IDLE);
        statusWatermarkValve.inputWatermarkStatus(WatermarkStatus.IDLE, 0, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
        statusWatermarkValve.inputWatermarkStatus(WatermarkStatus.ACTIVE, 0, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isEqualTo(WatermarkStatus.ACTIVE);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
    }

    @Test
    void testSingleInputWatermarksIntactDuringIdleness() throws Exception {
        StatusWatermarkOutput statusWatermarkOutput = new StatusWatermarkOutput();
        StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(1);
        statusWatermarkValve.inputWatermark(new Watermark(25L), 0, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isEqualTo(new Watermark(25L));
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
        statusWatermarkValve.inputWatermarkStatus(WatermarkStatus.IDLE, 0, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isEqualTo(WatermarkStatus.IDLE);
        statusWatermarkValve.inputWatermark(new Watermark(50L), 0, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
        Assertions.assertThat(statusWatermarkValve.getSubpartitionStatus(0).watermark).isEqualTo(25L);
        statusWatermarkValve.inputWatermarkStatus(WatermarkStatus.ACTIVE, 0, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isEqualTo(WatermarkStatus.ACTIVE);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
        statusWatermarkValve.inputWatermark(new Watermark(50L), 0, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isEqualTo(new Watermark(50L));
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
    }

    @Test
    void testMultipleInputYieldsWatermarkOnlyWhenAllChannelsReceivesWatermarks() throws Exception {
        StatusWatermarkOutput statusWatermarkOutput = new StatusWatermarkOutput();
        StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(3);
        statusWatermarkValve.inputWatermark(new Watermark(0L), 0, statusWatermarkOutput);
        statusWatermarkValve.inputWatermark(new Watermark(0L), 1, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
        statusWatermarkValve.inputWatermark(new Watermark(0L), 2, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isEqualTo(new Watermark(0L));
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
    }

    @Test
    void testMultipleInputIncreasingWatermarks() throws Exception {
        StatusWatermarkOutput statusWatermarkOutput = new StatusWatermarkOutput();
        StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(3);
        statusWatermarkValve.inputWatermark(new Watermark(0L), 0, statusWatermarkOutput);
        statusWatermarkValve.inputWatermark(new Watermark(0L), 1, statusWatermarkOutput);
        statusWatermarkValve.inputWatermark(new Watermark(0L), 2, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isEqualTo(new Watermark(0L));
        statusWatermarkValve.inputWatermark(new Watermark(12L), 0, statusWatermarkOutput);
        statusWatermarkValve.inputWatermark(new Watermark(8L), 2, statusWatermarkOutput);
        statusWatermarkValve.inputWatermark(new Watermark(10L), 2, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
        statusWatermarkValve.inputWatermark(new Watermark(15L), 1, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isEqualTo(new Watermark(10L));
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
        statusWatermarkValve.inputWatermark(new Watermark(17L), 2, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isEqualTo(new Watermark(12L));
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
        statusWatermarkValve.inputWatermark(new Watermark(20L), 0, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isEqualTo(new Watermark(15L));
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
    }

    @Test
    void testMultipleInputDecreasingWatermarksYieldsNoOutput() throws Exception {
        StatusWatermarkOutput statusWatermarkOutput = new StatusWatermarkOutput();
        StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(3);
        statusWatermarkValve.inputWatermark(new Watermark(25L), 0, statusWatermarkOutput);
        statusWatermarkValve.inputWatermark(new Watermark(10L), 1, statusWatermarkOutput);
        statusWatermarkValve.inputWatermark(new Watermark(17L), 2, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isEqualTo(new Watermark(10L));
        statusWatermarkValve.inputWatermark(new Watermark(12L), 0, statusWatermarkOutput);
        statusWatermarkValve.inputWatermark(new Watermark(8L), 1, statusWatermarkOutput);
        statusWatermarkValve.inputWatermark(new Watermark(15L), 2, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
    }

    @Test
    void testMultipleInputWatermarkStatusToggling() throws Exception {
        StatusWatermarkOutput statusWatermarkOutput = new StatusWatermarkOutput();
        StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(2);
        statusWatermarkValve.inputWatermarkStatus(WatermarkStatus.ACTIVE, 0, statusWatermarkOutput);
        statusWatermarkValve.inputWatermarkStatus(WatermarkStatus.ACTIVE, 1, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
        statusWatermarkValve.inputWatermarkStatus(WatermarkStatus.IDLE, 1, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
        statusWatermarkValve.inputWatermarkStatus(WatermarkStatus.IDLE, 0, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isEqualTo(WatermarkStatus.IDLE);
        statusWatermarkValve.inputWatermarkStatus(WatermarkStatus.IDLE, 0, statusWatermarkOutput);
        statusWatermarkValve.inputWatermarkStatus(WatermarkStatus.IDLE, 1, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
        statusWatermarkValve.inputWatermarkStatus(WatermarkStatus.ACTIVE, 1, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isEqualTo(WatermarkStatus.ACTIVE);
        statusWatermarkValve.inputWatermarkStatus(WatermarkStatus.ACTIVE, 0, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
    }

    @Test
    void testMultipleInputWatermarkAdvancingWithPartiallyIdleChannels() throws Exception {
        StatusWatermarkOutput statusWatermarkOutput = new StatusWatermarkOutput();
        StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(3);
        statusWatermarkValve.inputWatermark(new Watermark(15L), 0, statusWatermarkOutput);
        statusWatermarkValve.inputWatermark(new Watermark(10L), 1, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
        statusWatermarkValve.inputWatermarkStatus(WatermarkStatus.IDLE, 2, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isEqualTo(new Watermark(10L));
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
        statusWatermarkValve.inputWatermark(new Watermark(18L), 1, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isEqualTo(new Watermark(15L));
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
        statusWatermarkValve.inputWatermark(new Watermark(20L), 0, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isEqualTo(new Watermark(18L));
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
    }

    @Test
    void testMultipleInputWatermarkAdvancingAsChannelsIndividuallyBecomeIdle() throws Exception {
        StatusWatermarkOutput statusWatermarkOutput = new StatusWatermarkOutput();
        StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(3);
        statusWatermarkValve.inputWatermark(new Watermark(25L), 0, statusWatermarkOutput);
        statusWatermarkValve.inputWatermark(new Watermark(10L), 1, statusWatermarkOutput);
        statusWatermarkValve.inputWatermark(new Watermark(17L), 2, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isEqualTo(new Watermark(10L));
        statusWatermarkValve.inputWatermarkStatus(WatermarkStatus.IDLE, 1, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isEqualTo(new Watermark(17L));
        statusWatermarkValve.inputWatermarkStatus(WatermarkStatus.IDLE, 2, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isEqualTo(new Watermark(25L));
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
    }

    @Test
    void testMultipleInputFlushMaxWatermarkAndWatermarkStatusOnceAllInputsBecomeIdle() throws Exception {
        StatusWatermarkOutput statusWatermarkOutput = new StatusWatermarkOutput();
        StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(3);
        statusWatermarkValve.inputWatermark(new Watermark(10L), 0, statusWatermarkOutput);
        statusWatermarkValve.inputWatermark(new Watermark(5L), 1, statusWatermarkOutput);
        statusWatermarkValve.inputWatermark(new Watermark(3L), 2, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isEqualTo(new Watermark(3L));
        statusWatermarkValve.inputWatermarkStatus(WatermarkStatus.IDLE, 0, statusWatermarkOutput);
        statusWatermarkValve.inputWatermarkStatus(WatermarkStatus.IDLE, 1, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
        statusWatermarkValve.inputWatermarkStatus(WatermarkStatus.IDLE, 2, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isEqualTo(new Watermark(10L));
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isEqualTo(WatermarkStatus.IDLE);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
    }

    @Test
    void testMultipleInputWatermarkRealignmentAfterResumeActive() throws Exception {
        StatusWatermarkOutput statusWatermarkOutput = new StatusWatermarkOutput();
        StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(3);
        statusWatermarkValve.inputWatermark(new Watermark(10L), 0, statusWatermarkOutput);
        statusWatermarkValve.inputWatermark(new Watermark(7L), 1, statusWatermarkOutput);
        statusWatermarkValve.inputWatermark(new Watermark(3L), 2, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isEqualTo(new Watermark(3L));
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
        statusWatermarkValve.inputWatermarkStatus(WatermarkStatus.IDLE, 2, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isEqualTo(new Watermark(7L));
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
        statusWatermarkValve.inputWatermarkStatus(WatermarkStatus.ACTIVE, 2, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkValve.getSubpartitionStatus(2).isWatermarkAligned).isFalse();
        statusWatermarkValve.inputWatermark(new Watermark(5L), 2, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkValve.getSubpartitionStatus(2).watermark).isEqualTo(5L);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
        statusWatermarkValve.inputWatermark(new Watermark(9L), 2, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkValve.getSubpartitionStatus(2).isWatermarkAligned).isTrue();
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
        statusWatermarkValve.inputWatermark(new Watermark(12L), 1, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isEqualTo(new Watermark(9L));
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
    }

    @Test
    void testNoOutputWhenAllActiveChannelsAreUnaligned() throws Exception {
        StatusWatermarkOutput statusWatermarkOutput = new StatusWatermarkOutput();
        StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(3);
        statusWatermarkValve.inputWatermark(new Watermark(10L), 0, statusWatermarkOutput);
        statusWatermarkValve.inputWatermark(new Watermark(7L), 1, statusWatermarkOutput);
        statusWatermarkValve.inputWatermarkStatus(WatermarkStatus.IDLE, 2, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isEqualTo(new Watermark(7L));
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
        statusWatermarkValve.inputWatermarkStatus(WatermarkStatus.ACTIVE, 2, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
        statusWatermarkValve.inputWatermarkStatus(WatermarkStatus.IDLE, 0, statusWatermarkOutput);
        statusWatermarkValve.inputWatermarkStatus(WatermarkStatus.IDLE, 1, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
    }

    @Test
    void testUnalignedActiveChannelBecomesIdle() throws Exception {
        StatusWatermarkOutput statusWatermarkOutput = new StatusWatermarkOutput();
        StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(2);
        statusWatermarkValve.inputWatermark(new Watermark(7L), 0, statusWatermarkOutput);
        statusWatermarkValve.inputWatermark(new Watermark(10L), 1, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isEqualTo(new Watermark(7L));
        statusWatermarkValve.inputWatermarkStatus(WatermarkStatus.IDLE, 0, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isEqualTo(new Watermark(10L));
        statusWatermarkValve.inputWatermarkStatus(WatermarkStatus.ACTIVE, 0, statusWatermarkOutput);
        statusWatermarkValve.inputWatermark(new Watermark(9L), 0, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
        statusWatermarkValve.inputWatermarkStatus(WatermarkStatus.IDLE, 0, statusWatermarkOutput);
        Assertions.assertThat(statusWatermarkOutput.popLastSeenOutput()).isNull();
    }
}
