/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.windowing;

import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Objects;
import java.util.Queue;
import java.util.stream.StreamSupport;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.functions.NullByteKeySelector;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorFactory;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class ContinuousProcessingTimeTriggerTest {
    private static final long NO_TIMESTAMP = Watermark.UNINITIALIZED.getTimestamp();

    ContinuousProcessingTimeTriggerTest() {
    }

    @Test
    void testProcessingTimeWindowFiring() throws Exception {
        ContinuousProcessingTimeTrigger trigger = ContinuousProcessingTimeTrigger.of((Duration)Duration.ofMillis(5L));
        Assertions.assertThat((boolean)trigger.canMerge()).isTrue();
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", BasicTypeInfo.INT_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperatorFactory operator = new WindowOperatorFactory((WindowAssigner)TumblingProcessingTimeWindows.of((Duration)Duration.ofMillis(10L)), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new NullByteKeySelector(), BasicTypeInfo.BYTE_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction((WindowFunction)new IntegerSumWindowFunction()), (Trigger)trigger, 0L, null);
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((StreamOperatorFactory)operator, operator.getKeySelector(), (TypeInformation)BasicTypeInfo.BYTE_TYPE_INFO);
        ArrayDeque<StreamRecord> expectedOutput = new ArrayDeque<StreamRecord>();
        testHarness.open();
        testHarness.getProcessingTimeService().setCurrentTime(0L);
        testHarness.processElement((Object)1, NO_TIMESTAMP);
        testHarness.getProcessingTimeService().setCurrentTime(2L);
        testHarness.processElement((Object)2, NO_TIMESTAMP);
        testHarness.getProcessingTimeService().setCurrentTime(5L);
        expectedOutput.add(new StreamRecord((Object)new WindowedInteger(new TimeWindow(0L, 10L), 3), 9L));
        TestHarnessUtil.assertOutputEquals((String)"Output mismatch", expectedOutput, (Queue)testHarness.getOutput());
        testHarness.getProcessingTimeService().setCurrentTime(7L);
        testHarness.processElement((Object)3, NO_TIMESTAMP);
        testHarness.getProcessingTimeService().setCurrentTime(9L);
        expectedOutput.add(new StreamRecord((Object)new WindowedInteger(new TimeWindow(0L, 10L), 6), 9L));
        TestHarnessUtil.assertOutputEquals((String)"Output mismatch", expectedOutput, (Queue)testHarness.getOutput());
        testHarness.getProcessingTimeService().setCurrentTime(10L);
        testHarness.processElement((Object)3, NO_TIMESTAMP);
        testHarness.getProcessingTimeService().setCurrentTime(15L);
        expectedOutput.add(new StreamRecord((Object)new WindowedInteger(new TimeWindow(10L, 20L), 3), 19L));
        TestHarnessUtil.assertOutputEquals((String)"Output mismatch", expectedOutput, (Queue)testHarness.getOutput());
        testHarness.getProcessingTimeService().setCurrentTime(18L);
        testHarness.processElement((Object)3, NO_TIMESTAMP);
        testHarness.getProcessingTimeService().setCurrentTime(20L);
        expectedOutput.add(new StreamRecord((Object)new WindowedInteger(new TimeWindow(10L, 20L), 6), 19L));
        TestHarnessUtil.assertOutputEquals((String)"Output mismatch", expectedOutput, (Queue)testHarness.getOutput());
    }

    @Test
    public void testEventTimeWindowFiring() throws Exception {
        ContinuousProcessingTimeTrigger trigger = ContinuousProcessingTimeTrigger.of((Duration)Duration.ofMillis(5L));
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", BasicTypeInfo.INT_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperatorFactory operator = new WindowOperatorFactory((WindowAssigner)TumblingEventTimeWindows.of((Duration)Duration.ofMillis(10L)), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new NullByteKeySelector(), BasicTypeInfo.BYTE_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction((WindowFunction)new IntegerSumWindowFunction()), (Trigger)trigger, 0L, null);
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((StreamOperatorFactory)operator, operator.getKeySelector(), (TypeInformation)BasicTypeInfo.BYTE_TYPE_INFO);
        ArrayDeque<StreamRecord> expectedOutput = new ArrayDeque<StreamRecord>();
        testHarness.open();
        testHarness.getProcessingTimeService().setCurrentTime(0L);
        testHarness.processElement((Object)1, 1L);
        testHarness.processElement((Object)2, 3L);
        testHarness.processElement((Object)3, 7L);
        testHarness.getProcessingTimeService().setCurrentTime(5L);
        expectedOutput.add(new StreamRecord((Object)new WindowedInteger(new TimeWindow(0L, 10L), 6), 9L));
        TestHarnessUtil.assertOutputEquals((String)"Output mismatch", expectedOutput, (Queue)testHarness.getOutput());
        testHarness.processElement((Object)3, 8L);
        testHarness.getProcessingTimeService().setCurrentTime(10L);
        expectedOutput.add(new StreamRecord((Object)new WindowedInteger(new TimeWindow(0L, 10L), 9), 9L));
        TestHarnessUtil.assertOutputEquals((String)"Output mismatch", expectedOutput, (Queue)testHarness.getOutput());
        testHarness.processElement((Object)3, 12L);
        testHarness.getProcessingTimeService().setCurrentTime(15L);
        expectedOutput.add(new StreamRecord((Object)new WindowedInteger(new TimeWindow(10L, 20L), 3), 19L));
        TestHarnessUtil.assertOutputEquals((String)"Output mismatch", expectedOutput, (Queue)testHarness.getOutput());
    }

    @Test
    void testMergingWindows() throws Exception {
        ContinuousProcessingTimeTrigger trigger = ContinuousProcessingTimeTrigger.of((Duration)Duration.ofMillis(5L));
        Assertions.assertThat((boolean)trigger.canMerge()).isTrue();
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", BasicTypeInfo.INT_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperatorFactory operator = new WindowOperatorFactory((WindowAssigner)ProcessingTimeSessionWindows.withGap((Duration)Duration.ofMillis(10L)), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new NullByteKeySelector(), BasicTypeInfo.BYTE_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction((WindowFunction)new IntegerSumWindowFunction()), (Trigger)trigger, 0L, null);
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((StreamOperatorFactory)operator, operator.getKeySelector(), (TypeInformation)BasicTypeInfo.BYTE_TYPE_INFO);
        ArrayDeque<StreamRecord> expectedOutput = new ArrayDeque<StreamRecord>();
        testHarness.open();
        testHarness.getProcessingTimeService().setCurrentTime(0L);
        testHarness.processElement((Object)1, NO_TIMESTAMP);
        testHarness.getProcessingTimeService().setCurrentTime(2L);
        testHarness.processElement((Object)2, NO_TIMESTAMP);
        testHarness.getProcessingTimeService().setCurrentTime(5L);
        expectedOutput.add(new StreamRecord((Object)new WindowedInteger(new TimeWindow(0L, 12L), 3), 11L));
        TestHarnessUtil.assertOutputEquals((String)"Output mismatch", expectedOutput, (Queue)testHarness.getOutput());
        testHarness.getProcessingTimeService().setCurrentTime(9L);
        TestHarnessUtil.assertOutputEquals((String)"Output mismatch", expectedOutput, (Queue)testHarness.getOutput());
        testHarness.getProcessingTimeService().setCurrentTime(10L);
        expectedOutput.add(new StreamRecord((Object)new WindowedInteger(new TimeWindow(0L, 12L), 3), 11L));
        TestHarnessUtil.assertOutputEquals((String)"Output mismatch", expectedOutput, (Queue)testHarness.getOutput());
        testHarness.getProcessingTimeService().setCurrentTime(15L);
        expectedOutput.add(new StreamRecord((Object)new WindowedInteger(new TimeWindow(0L, 12L), 3), 11L));
        TestHarnessUtil.assertOutputEquals((String)"Output mismatch", expectedOutput, (Queue)testHarness.getOutput());
        testHarness.getProcessingTimeService().setCurrentTime(100L);
        TestHarnessUtil.assertOutputEquals((String)"Output mismatch", expectedOutput, (Queue)testHarness.getOutput());
    }

    private static class IntegerSumWindowFunction
    implements WindowFunction<Integer, WindowedInteger, Byte, TimeWindow> {
        private IntegerSumWindowFunction() {
        }

        public void apply(Byte key, TimeWindow window, Iterable<Integer> input, Collector<WindowedInteger> out) throws Exception {
            int sum = StreamSupport.stream(input.spliterator(), false).mapToInt(Integer::intValue).sum();
            out.collect((Object)new WindowedInteger(window, sum));
        }
    }

    private static class WindowedInteger {
        private final TimeWindow window;
        private final int value;

        public WindowedInteger(TimeWindow window, int value) {
            this.window = window;
            this.value = value;
        }

        public boolean equals(Object o) {
            if (!(o instanceof WindowedInteger)) {
                return false;
            }
            WindowedInteger other = (WindowedInteger)o;
            return this.value == other.value && Objects.equals(this.window, other.window);
        }

        public int hashCode() {
            return Objects.hash(this.window, this.value);
        }

        public String toString() {
            return "WindowedInteger{window=" + this.window + ", value=" + this.value + "}";
        }
    }
}

