/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.watermarkstatus;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class StatusWatermarkValveTest {
    StatusWatermarkValveTest() {
    }

    @Test
    void testSingleInputIncreasingWatermarks() throws Exception {
        StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
        StatusWatermarkValve valve = new StatusWatermarkValve(1);
        valve.inputWatermark(new Watermark(0L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isEqualTo((Object)new Watermark(0L));
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
        valve.inputWatermark(new Watermark(25L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isEqualTo((Object)new Watermark(25L));
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
    }

    @Test
    void testSingleInputDecreasingWatermarksYieldsNoOutput() throws Exception {
        StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
        StatusWatermarkValve valve = new StatusWatermarkValve(1);
        valve.inputWatermark(new Watermark(25L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isEqualTo((Object)new Watermark(25L));
        valve.inputWatermark(new Watermark(18L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
        valve.inputWatermark(new Watermark(42L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isEqualTo((Object)new Watermark(42L));
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
    }

    @Test
    void testSingleInputWatermarkStatusToggling() throws Exception {
        StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
        StatusWatermarkValve valve = new StatusWatermarkValve(1);
        valve.inputWatermarkStatus(WatermarkStatus.ACTIVE, 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isEqualTo((Object)WatermarkStatus.IDLE);
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
        valve.inputWatermarkStatus(WatermarkStatus.ACTIVE, 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isEqualTo((Object)WatermarkStatus.ACTIVE);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
    }

    @Test
    void testSingleInputWatermarksIntactDuringIdleness() throws Exception {
        StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
        StatusWatermarkValve valve = new StatusWatermarkValve(1);
        valve.inputWatermark(new Watermark(25L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isEqualTo((Object)new Watermark(25L));
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isEqualTo((Object)WatermarkStatus.IDLE);
        valve.inputWatermark(new Watermark(50L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
        Assertions.assertThat((long)valve.getSubpartitionStatus((int)0).watermark).isEqualTo(25L);
        valve.inputWatermarkStatus(WatermarkStatus.ACTIVE, 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isEqualTo((Object)WatermarkStatus.ACTIVE);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
        valve.inputWatermark(new Watermark(50L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isEqualTo((Object)new Watermark(50L));
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
    }

    @Test
    void testMultipleInputYieldsWatermarkOnlyWhenAllChannelsReceivesWatermarks() throws Exception {
        StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
        StatusWatermarkValve valve = new StatusWatermarkValve(3);
        valve.inputWatermark(new Watermark(0L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(0L), 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
        valve.inputWatermark(new Watermark(0L), 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isEqualTo((Object)new Watermark(0L));
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
    }

    @Test
    void testMultipleInputIncreasingWatermarks() throws Exception {
        StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
        StatusWatermarkValve valve = new StatusWatermarkValve(3);
        valve.inputWatermark(new Watermark(0L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(0L), 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(0L), 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isEqualTo((Object)new Watermark(0L));
        valve.inputWatermark(new Watermark(12L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(8L), 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(10L), 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
        valve.inputWatermark(new Watermark(15L), 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isEqualTo((Object)new Watermark(10L));
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
        valve.inputWatermark(new Watermark(17L), 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isEqualTo((Object)new Watermark(12L));
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
        valve.inputWatermark(new Watermark(20L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isEqualTo((Object)new Watermark(15L));
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
    }

    @Test
    void testMultipleInputDecreasingWatermarksYieldsNoOutput() throws Exception {
        StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
        StatusWatermarkValve valve = new StatusWatermarkValve(3);
        valve.inputWatermark(new Watermark(25L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(10L), 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(17L), 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isEqualTo((Object)new Watermark(10L));
        valve.inputWatermark(new Watermark(12L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(8L), 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(15L), 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
    }

    @Test
    void testMultipleInputWatermarkStatusToggling() throws Exception {
        StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
        StatusWatermarkValve valve = new StatusWatermarkValve(2);
        valve.inputWatermarkStatus(WatermarkStatus.ACTIVE, 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermarkStatus(WatermarkStatus.ACTIVE, 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isEqualTo((Object)WatermarkStatus.IDLE);
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
        valve.inputWatermarkStatus(WatermarkStatus.ACTIVE, 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isEqualTo((Object)WatermarkStatus.ACTIVE);
        valve.inputWatermarkStatus(WatermarkStatus.ACTIVE, 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
    }

    @Test
    void testMultipleInputWatermarkAdvancingWithPartiallyIdleChannels() throws Exception {
        StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
        StatusWatermarkValve valve = new StatusWatermarkValve(3);
        valve.inputWatermark(new Watermark(15L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(10L), 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isEqualTo((Object)new Watermark(10L));
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
        valve.inputWatermark(new Watermark(18L), 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isEqualTo((Object)new Watermark(15L));
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
        valve.inputWatermark(new Watermark(20L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isEqualTo((Object)new Watermark(18L));
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
    }

    @Test
    void testMultipleInputWatermarkAdvancingAsChannelsIndividuallyBecomeIdle() throws Exception {
        StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
        StatusWatermarkValve valve = new StatusWatermarkValve(3);
        valve.inputWatermark(new Watermark(25L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(10L), 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(17L), 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isEqualTo((Object)new Watermark(10L));
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isEqualTo((Object)new Watermark(17L));
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isEqualTo((Object)new Watermark(25L));
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
    }

    @Test
    void testMultipleInputFlushMaxWatermarkAndWatermarkStatusOnceAllInputsBecomeIdle() throws Exception {
        StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
        StatusWatermarkValve valve = new StatusWatermarkValve(3);
        valve.inputWatermark(new Watermark(10L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(5L), 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(3L), 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isEqualTo((Object)new Watermark(3L));
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isEqualTo((Object)new Watermark(10L));
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isEqualTo((Object)WatermarkStatus.IDLE);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
    }

    @Test
    void testMultipleInputWatermarkRealignmentAfterResumeActive() throws Exception {
        StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
        StatusWatermarkValve valve = new StatusWatermarkValve(3);
        valve.inputWatermark(new Watermark(10L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(7L), 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(3L), 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isEqualTo((Object)new Watermark(3L));
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isEqualTo((Object)new Watermark(7L));
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
        valve.inputWatermarkStatus(WatermarkStatus.ACTIVE, 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((boolean)valve.getSubpartitionStatus((int)2).isWatermarkAligned).isFalse();
        valve.inputWatermark(new Watermark(5L), 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((long)valve.getSubpartitionStatus((int)2).watermark).isEqualTo(5L);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
        valve.inputWatermark(new Watermark(9L), 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((boolean)valve.getSubpartitionStatus((int)2).isWatermarkAligned).isTrue();
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
        valve.inputWatermark(new Watermark(12L), 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isEqualTo((Object)new Watermark(9L));
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
    }

    @Test
    void testNoOutputWhenAllActiveChannelsAreUnaligned() throws Exception {
        StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
        StatusWatermarkValve valve = new StatusWatermarkValve(3);
        valve.inputWatermark(new Watermark(10L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(7L), 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isEqualTo((Object)new Watermark(7L));
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
        valve.inputWatermarkStatus(WatermarkStatus.ACTIVE, 2, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
    }

    @Test
    void testUnalignedActiveChannelBecomesIdle() throws Exception {
        StatusWatermarkOutput valveOutput = new StatusWatermarkOutput();
        StatusWatermarkValve valve = new StatusWatermarkValve(2);
        valve.inputWatermark(new Watermark(7L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(10L), 1, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isEqualTo((Object)new Watermark(7L));
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isEqualTo((Object)new Watermark(10L));
        valve.inputWatermarkStatus(WatermarkStatus.ACTIVE, 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        valve.inputWatermark(new Watermark(9L), 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
        valve.inputWatermarkStatus(WatermarkStatus.IDLE, 0, (PushingAsyncDataInput.DataOutput)valveOutput);
        Assertions.assertThat((Object)valveOutput.popLastSeenOutput()).isNull();
    }

    private static class StatusWatermarkOutput
    implements PushingAsyncDataInput.DataOutput {
        private BlockingQueue<StreamElement> allOutputs = new LinkedBlockingQueue<StreamElement>();

        private StatusWatermarkOutput() {
        }

        public void emitWatermark(Watermark watermark) {
            this.allOutputs.add((StreamElement)watermark);
        }

        public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {
            this.allOutputs.add((StreamElement)watermarkStatus);
        }

        public void emitRecord(StreamRecord record) {
            throw new UnsupportedOperationException();
        }

        public void emitLatencyMarker(LatencyMarker latencyMarker) {
            throw new UnsupportedOperationException();
        }

        public void emitRecordAttributes(RecordAttributes recordAttributes) throws Exception {
            throw new UnsupportedOperationException();
        }

        public StreamElement popLastSeenOutput() {
            return (StreamElement)this.allOutputs.poll();
        }
    }
}

