package org.apache.flink.streaming.runtime.operators.windowing;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichAggregateFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.class */
public class AllWindowTranslationTest {

    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest$DummyAggregationFunction.class */
    private static class DummyAggregationFunction implements AggregateFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> {
        private DummyAggregationFunction() {
        }

        /* renamed from: createAccumulator, reason: merged with bridge method [inline-methods] */
        public Tuple2<String, Integer> m111createAccumulator() {
            return new Tuple2<>("", 0);
        }

        public Tuple2<String, Integer> add(Tuple2<String, Integer> tuple2, Tuple2<String, Integer> tuple22) {
            tuple22.f0 = tuple2.f0;
            tuple22.f1 = tuple2.f1;
            return tuple22;
        }

        public Tuple2<String, Integer> getResult(Tuple2<String, Integer> tuple2) {
            return tuple2;
        }

        public Tuple2<String, Integer> merge(Tuple2<String, Integer> tuple2, Tuple2<String, Integer> tuple22) {
            return tuple2;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest$DummyReducer.class */
    private static class DummyReducer implements ReduceFunction<Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1;

        private DummyReducer() {
        }

        public Tuple2<String, Integer> reduce(Tuple2<String, Integer> tuple2, Tuple2<String, Integer> tuple22) throws Exception {
            return tuple2;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest$DummyRichAggregationFunction.class */
    private static class DummyRichAggregationFunction<T> extends RichAggregateFunction<T, T, T> {
        private DummyRichAggregationFunction() {
        }

        public T createAccumulator() {
            return null;
        }

        public T add(T t, T t2) {
            return t2;
        }

        public T getResult(T t) {
            return t;
        }

        public T merge(T t, T t2) {
            return t;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest$TestAllWindowFunction.class */
    private static class TestAllWindowFunction implements AllWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, TimeWindow> {
        private TestAllWindowFunction() {
        }

        public void apply(TimeWindow timeWindow, Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple3<String, String, Integer>> collector) throws Exception {
            for (Tuple2<String, Integer> tuple2 : iterable) {
                collector.collect(new Tuple3(tuple2.f0, tuple2.f0, tuple2.f1));
            }
        }

        public /* bridge */ /* synthetic */ void apply(Window window, Iterable iterable, Collector collector) throws Exception {
            apply((TimeWindow) window, (Iterable<Tuple2<String, Integer>>) iterable, (Collector<Tuple3<String, String, Integer>>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest$TestProcessAllWindowFunction.class */
    private static class TestProcessAllWindowFunction extends ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, TimeWindow> {
        private TestProcessAllWindowFunction() {
        }

        public void process(ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, TimeWindow>.Context context, Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple3<String, String, Integer>> collector) throws Exception {
            for (Tuple2<String, Integer> tuple2 : iterable) {
                collector.collect(new Tuple3(tuple2.f0, tuple2.f0, tuple2.f1));
            }
        }
    }

    @Test(expected = UnsupportedOperationException.class)
    public void testReduceWithRichReducerFails() throws Exception {
        StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Tuple2[]{Tuple2.of("hello", 1), Tuple2.of("hello", 2)}).windowAll(SlidingEventTimeWindows.of(Time.of(1L, TimeUnit.SECONDS), Time.of(100L, TimeUnit.MILLISECONDS))).reduce(new RichReduceFunction<Tuple2<String, Integer>>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.AllWindowTranslationTest.1
            private static final long serialVersionUID = -6448847205314995812L;

            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> tuple2, Tuple2<String, Integer> tuple22) throws Exception {
                return null;
            }
        });
        Assert.fail("exception was not thrown");
    }

    @Test(expected = UnsupportedOperationException.class)
    public void testAggregateWithRichFunctionFails() throws Exception {
        StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Tuple2[]{Tuple2.of("hello", 1), Tuple2.of("hello", 2)}).windowAll(SlidingEventTimeWindows.of(Time.of(1L, TimeUnit.SECONDS), Time.of(100L, TimeUnit.MILLISECONDS))).aggregate(new DummyRichAggregationFunction());
        Assert.fail("exception was not thrown");
    }

    @Test
    public void testMergingAssignerWithNonMergingTriggerFails() throws Exception {
        this.expectedException.expect(UnsupportedOperationException.class);
        this.expectedException.expectMessage("A merging window assigner cannot be used with a trigger that does not support merging");
        StreamExecutionEnvironment.getExecutionEnvironment().fromData(new String[]{"Hello", "Ciao"}).windowAll(EventTimeSessionWindows.withGap(Time.seconds(5L))).trigger(new Trigger<String, TimeWindow>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.AllWindowTranslationTest.2
            private static final long serialVersionUID = 6558046711583024443L;

            public TriggerResult onElement(String str, long j, TimeWindow timeWindow, Trigger.TriggerContext triggerContext) throws Exception {
                return null;
            }

            public TriggerResult onProcessingTime(long j, TimeWindow timeWindow, Trigger.TriggerContext triggerContext) throws Exception {
                return null;
            }

            public TriggerResult onEventTime(long j, TimeWindow timeWindow, Trigger.TriggerContext triggerContext) throws Exception {
                return null;
            }

            public boolean canMerge() {
                return false;
            }

            public void clear(TimeWindow timeWindow, Trigger.TriggerContext triggerContext) throws Exception {
            }
        });
    }

    @Test
    public void testMergingWindowsWithEvictor() throws Exception {
        WindowOperator operator = StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Tuple2[]{Tuple2.of("hello", 1), Tuple2.of("hello", 2)}).windowAll(EventTimeSessionWindows.withGap(Time.seconds(5L))).evictor(CountEvictor.of(5L)).process(new TestProcessAllWindowFunction()).getTransformation().getOperator();
        Assert.assertTrue(operator instanceof WindowOperator);
        WindowOperator windowOperator = operator;
        Assert.assertTrue(windowOperator.getTrigger() instanceof EventTimeTrigger);
        Assert.assertTrue(windowOperator.getWindowAssigner() instanceof EventTimeSessionWindows);
        Assert.assertTrue(windowOperator.getStateDescriptor() instanceof ListStateDescriptor);
        processElementAndEnsureOutput(windowOperator, windowOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2("hello", 1));
    }

    @Test
    public void testReduceEventTime() throws Exception {
        WindowOperator operator = StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Tuple2[]{Tuple2.of("hello", 1), Tuple2.of("hello", 2)}).windowAll(SlidingEventTimeWindows.of(Time.of(1L, TimeUnit.SECONDS), Time.of(100L, TimeUnit.MILLISECONDS))).reduce(new DummyReducer()).getTransformation().getOperator();
        Assert.assertTrue(operator instanceof WindowOperator);
        WindowOperator windowOperator = operator;
        Assert.assertTrue(windowOperator.getTrigger() instanceof EventTimeTrigger);
        Assert.assertTrue(windowOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
        Assert.assertTrue(windowOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
        processElementAndEnsureOutput(windowOperator, windowOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2("hello", 1));
    }

    @Test
    public void testReduceProcessingTime() throws Exception {
        WindowOperator operator = StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Tuple2[]{Tuple2.of("hello", 1), Tuple2.of("hello", 2)}).windowAll(SlidingProcessingTimeWindows.of(Time.of(1L, TimeUnit.SECONDS), Time.of(100L, TimeUnit.MILLISECONDS))).reduce(new DummyReducer()).getTransformation().getOperator();
        Assert.assertTrue(operator instanceof WindowOperator);
        WindowOperator windowOperator = operator;
        Assert.assertTrue(windowOperator.getTrigger() instanceof ProcessingTimeTrigger);
        Assert.assertTrue(windowOperator.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
        Assert.assertTrue(windowOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
        processElementAndEnsureOutput(windowOperator, windowOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2("hello", 1));
    }

    @Test
    public void testReduceWithWindowFunctionEventTime() throws Exception {
        WindowOperator operator = StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Tuple2[]{Tuple2.of("hello", 1), Tuple2.of("hello", 2)}).windowAll(TumblingEventTimeWindows.of(Time.of(1L, TimeUnit.SECONDS))).reduce(new DummyReducer(), new AllWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, TimeWindow>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.AllWindowTranslationTest.3
            private static final long serialVersionUID = 1;

            public void apply(TimeWindow timeWindow, Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple3<String, String, Integer>> collector) throws Exception {
                for (Tuple2<String, Integer> tuple2 : iterable) {
                    collector.collect(new Tuple3(tuple2.f0, tuple2.f0, tuple2.f1));
                }
            }

            public /* bridge */ /* synthetic */ void apply(Window window, Iterable iterable, Collector collector) throws Exception {
                apply((TimeWindow) window, (Iterable<Tuple2<String, Integer>>) iterable, (Collector<Tuple3<String, String, Integer>>) collector);
            }
        }).getTransformation().getOperator();
        Assert.assertTrue(operator instanceof WindowOperator);
        WindowOperator windowOperator = operator;
        Assert.assertTrue(windowOperator.getTrigger() instanceof EventTimeTrigger);
        Assert.assertTrue(windowOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
        Assert.assertTrue(windowOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
        processElementAndEnsureOutput(operator, windowOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2("hello", 1));
    }

    @Test
    public void testReduceWithWindowFunctionProcessingTime() throws Exception {
        WindowOperator operator = StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Tuple2[]{Tuple2.of("hello", 1), Tuple2.of("hello", 2)}).windowAll(TumblingProcessingTimeWindows.of(Time.of(1L, TimeUnit.SECONDS))).reduce(new DummyReducer(), new AllWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, TimeWindow>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.AllWindowTranslationTest.4
            private static final long serialVersionUID = 1;

            public void apply(TimeWindow timeWindow, Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple3<String, String, Integer>> collector) throws Exception {
                for (Tuple2<String, Integer> tuple2 : iterable) {
                    collector.collect(new Tuple3(tuple2.f0, tuple2.f0, tuple2.f1));
                }
            }

            public /* bridge */ /* synthetic */ void apply(Window window, Iterable iterable, Collector collector) throws Exception {
                apply((TimeWindow) window, (Iterable<Tuple2<String, Integer>>) iterable, (Collector<Tuple3<String, String, Integer>>) collector);
            }
        }).getTransformation().getOperator();
        Assert.assertTrue(operator instanceof WindowOperator);
        WindowOperator windowOperator = operator;
        Assert.assertTrue(windowOperator.getTrigger() instanceof ProcessingTimeTrigger);
        Assert.assertTrue(windowOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
        Assert.assertTrue(windowOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
        processElementAndEnsureOutput(operator, windowOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2("hello", 1));
    }

    @Test
    public void testReduceWithProcessWindowFunctionEventTime() throws Exception {
        WindowOperator operator = StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Tuple2[]{Tuple2.of("hello", 1), Tuple2.of("hello", 2)}).windowAll(TumblingEventTimeWindows.of(Time.of(1L, TimeUnit.SECONDS))).reduce(new DummyReducer(), new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, TimeWindow>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.AllWindowTranslationTest.5
            private static final long serialVersionUID = 1;

            public void process(ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, TimeWindow>.Context context, Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple3<String, String, Integer>> collector) throws Exception {
                for (Tuple2<String, Integer> tuple2 : iterable) {
                    collector.collect(new Tuple3(tuple2.f0, tuple2.f0, tuple2.f1));
                }
            }
        }).getTransformation().getOperator();
        Assert.assertTrue(operator instanceof WindowOperator);
        WindowOperator windowOperator = operator;
        Assert.assertTrue(windowOperator.getTrigger() instanceof EventTimeTrigger);
        Assert.assertTrue(windowOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
        Assert.assertTrue(windowOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
        processElementAndEnsureOutput(operator, windowOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2("hello", 1));
    }

    @Test
    public void testReduceWithProcessWindowFunctionProcessingTime() throws Exception {
        WindowOperator operator = StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Tuple2[]{Tuple2.of("hello", 1), Tuple2.of("hello", 2)}).windowAll(TumblingProcessingTimeWindows.of(Time.of(1L, TimeUnit.SECONDS))).reduce(new DummyReducer(), new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, TimeWindow>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.AllWindowTranslationTest.6
            private static final long serialVersionUID = 1;

            public void process(ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, TimeWindow>.Context context, Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple3<String, String, Integer>> collector) throws Exception {
                for (Tuple2<String, Integer> tuple2 : iterable) {
                    collector.collect(new Tuple3(tuple2.f0, tuple2.f0, tuple2.f1));
                }
            }
        }).getTransformation().getOperator();
        Assert.assertTrue(operator instanceof WindowOperator);
        WindowOperator windowOperator = operator;
        Assert.assertTrue(windowOperator.getTrigger() instanceof ProcessingTimeTrigger);
        Assert.assertTrue(windowOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
        Assert.assertTrue(windowOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
        processElementAndEnsureOutput(operator, windowOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2("hello", 1));
    }

    @Test
    public void testReduceWithEvictorAndProcessFunction() throws Exception {
        EvictingWindowOperator operator = StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Tuple2[]{Tuple2.of("hello", 1), Tuple2.of("hello", 2)}).windowAll(SlidingEventTimeWindows.of(Time.of(1L, TimeUnit.SECONDS), Time.of(100L, TimeUnit.MILLISECONDS))).evictor(CountEvictor.of(100L)).reduce(new DummyReducer(), new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.AllWindowTranslationTest.7
            public void process(ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>.Context context, Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
                Iterator<Tuple2<String, Integer>> it = iterable.iterator();
                while (it.hasNext()) {
                    collector.collect(it.next());
                }
            }
        }).getTransformation().getOperator();
        Assert.assertTrue(operator instanceof EvictingWindowOperator);
        EvictingWindowOperator evictingWindowOperator = operator;
        Assert.assertTrue(evictingWindowOperator.getTrigger() instanceof EventTimeTrigger);
        Assert.assertTrue(evictingWindowOperator.getEvictor() instanceof CountEvictor);
        Assert.assertTrue(evictingWindowOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
        Assert.assertTrue(evictingWindowOperator.getStateDescriptor() instanceof ListStateDescriptor);
        processElementAndEnsureOutput(evictingWindowOperator, evictingWindowOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2("hello", 1));
    }

    @Test
    public void testApplyWithPreReducerEventTime() throws Exception {
        WindowOperator operator = StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Tuple2[]{Tuple2.of("hello", 1), Tuple2.of("hello", 2)}).windowAll(TumblingEventTimeWindows.of(Time.of(1L, TimeUnit.SECONDS))).apply(new DummyReducer(), new AllWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, TimeWindow>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.AllWindowTranslationTest.8
            private static final long serialVersionUID = 1;

            public void apply(TimeWindow timeWindow, Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple3<String, String, Integer>> collector) throws Exception {
                for (Tuple2<String, Integer> tuple2 : iterable) {
                    collector.collect(new Tuple3(tuple2.f0, tuple2.f0, tuple2.f1));
                }
            }

            public /* bridge */ /* synthetic */ void apply(Window window, Iterable iterable, Collector collector) throws Exception {
                apply((TimeWindow) window, (Iterable<Tuple2<String, Integer>>) iterable, (Collector<Tuple3<String, String, Integer>>) collector);
            }
        }).getTransformation().getOperator();
        Assert.assertTrue(operator instanceof WindowOperator);
        WindowOperator windowOperator = operator;
        Assert.assertTrue(windowOperator.getTrigger() instanceof EventTimeTrigger);
        Assert.assertTrue(windowOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
        Assert.assertTrue(windowOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
        processElementAndEnsureOutput(operator, windowOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2("hello", 1));
    }

    @Test
    public void testAggregateEventTime() throws Exception {
        WindowOperator operator = StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Tuple2[]{Tuple2.of("hello", 1), Tuple2.of("hello", 2)}).windowAll(SlidingEventTimeWindows.of(Time.of(1L, TimeUnit.SECONDS), Time.of(100L, TimeUnit.MILLISECONDS))).aggregate(new DummyAggregationFunction()).getTransformation().getOperator();
        Assert.assertTrue(operator instanceof WindowOperator);
        WindowOperator windowOperator = operator;
        Assert.assertTrue(windowOperator.getTrigger() instanceof EventTimeTrigger);
        Assert.assertTrue(windowOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
        Assert.assertTrue(windowOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
        processElementAndEnsureOutput(windowOperator, windowOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2("hello", 1));
    }

    @Test
    public void testAggregateProcessingTime() throws Exception {
        WindowOperator operator = StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Tuple2[]{Tuple2.of("hello", 1), Tuple2.of("hello", 2)}).windowAll(SlidingProcessingTimeWindows.of(Time.of(1L, TimeUnit.SECONDS), Time.of(100L, TimeUnit.MILLISECONDS))).aggregate(new DummyAggregationFunction()).getTransformation().getOperator();
        Assert.assertTrue(operator instanceof WindowOperator);
        WindowOperator windowOperator = operator;
        Assert.assertTrue(windowOperator.getTrigger() instanceof ProcessingTimeTrigger);
        Assert.assertTrue(windowOperator.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
        Assert.assertTrue(windowOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
        processElementAndEnsureOutput(windowOperator, windowOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2("hello", 1));
    }

    @Test
    public void testAggregateWithWindowFunctionEventTime() throws Exception {
        WindowOperator operator = StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Tuple2[]{Tuple2.of("hello", 1), Tuple2.of("hello", 2)}).windowAll(TumblingEventTimeWindows.of(Time.of(1L, TimeUnit.SECONDS))).aggregate(new DummyAggregationFunction(), new TestAllWindowFunction()).getTransformation().getOperator();
        Assert.assertTrue(operator instanceof WindowOperator);
        WindowOperator windowOperator = operator;
        Assert.assertTrue(windowOperator.getTrigger() instanceof EventTimeTrigger);
        Assert.assertTrue(windowOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
        Assert.assertTrue(windowOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
        processElementAndEnsureOutput(operator, windowOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2("hello", 1));
    }

    @Test
    public void testAggregateWithWindowFunctionProcessingTime() throws Exception {
        WindowOperator operator = StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Tuple2[]{Tuple2.of("hello", 1), Tuple2.of("hello", 2)}).windowAll(TumblingProcessingTimeWindows.of(Time.of(1L, TimeUnit.SECONDS))).aggregate(new DummyAggregationFunction(), new TestAllWindowFunction()).getTransformation().getOperator();
        Assert.assertTrue(operator instanceof WindowOperator);
        WindowOperator windowOperator = operator;
        Assert.assertTrue(windowOperator.getTrigger() instanceof ProcessingTimeTrigger);
        Assert.assertTrue(windowOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
        Assert.assertTrue(windowOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
        processElementAndEnsureOutput(operator, windowOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2("hello", 1));
    }

    @Test
    public void testAggregateWithEvictor() throws Exception {
        WindowOperator operator = StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Tuple2[]{Tuple2.of("hello", 1), Tuple2.of("hello", 2)}).windowAll(SlidingEventTimeWindows.of(Time.of(1L, TimeUnit.SECONDS), Time.of(100L, TimeUnit.MILLISECONDS))).evictor(CountEvictor.of(100L)).aggregate(new DummyAggregationFunction()).getTransformation().getOperator();
        Assert.assertTrue(operator instanceof WindowOperator);
        WindowOperator windowOperator = operator;
        Assert.assertTrue(windowOperator.getTrigger() instanceof EventTimeTrigger);
        Assert.assertTrue(windowOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
        Assert.assertTrue(windowOperator.getStateDescriptor() instanceof ListStateDescriptor);
        processElementAndEnsureOutput(windowOperator, windowOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2("hello", 1));
    }

    @Test
    public void testAggregateWithEvictorAndProcessFunction() throws Exception {
        WindowOperator operator = StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Tuple2[]{Tuple2.of("hello", 1), Tuple2.of("hello", 2)}).windowAll(SlidingEventTimeWindows.of(Time.of(1L, TimeUnit.SECONDS), Time.of(100L, TimeUnit.MILLISECONDS))).evictor(CountEvictor.of(100L)).aggregate(new DummyAggregationFunction(), new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.AllWindowTranslationTest.9
            public void process(ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>.Context context, Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
                Iterator<Tuple2<String, Integer>> it = iterable.iterator();
                while (it.hasNext()) {
                    collector.collect(it.next());
                }
            }
        }).getTransformation().getOperator();
        Assert.assertTrue(operator instanceof WindowOperator);
        WindowOperator windowOperator = operator;
        Assert.assertTrue(windowOperator.getTrigger() instanceof EventTimeTrigger);
        Assert.assertTrue(windowOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
        Assert.assertTrue(windowOperator.getStateDescriptor() instanceof ListStateDescriptor);
        processElementAndEnsureOutput(windowOperator, windowOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2("hello", 1));
    }

    @Test
    public void testProcessEventTime() throws Exception {
        WindowOperator operator = StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Tuple2[]{Tuple2.of("hello", 1), Tuple2.of("hello", 2)}).windowAll(TumblingEventTimeWindows.of(Time.of(1L, TimeUnit.SECONDS))).process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.AllWindowTranslationTest.10
            private static final long serialVersionUID = 1;

            public void process(ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>.Context context, Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
                Iterator<Tuple2<String, Integer>> it = iterable.iterator();
                while (it.hasNext()) {
                    collector.collect(it.next());
                }
            }
        }).getTransformation().getOperator();
        Assert.assertTrue(operator instanceof WindowOperator);
        WindowOperator windowOperator = operator;
        Assert.assertTrue(windowOperator.getTrigger() instanceof EventTimeTrigger);
        Assert.assertTrue(windowOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
        Assert.assertTrue(windowOperator.getStateDescriptor() instanceof ListStateDescriptor);
        processElementAndEnsureOutput(windowOperator, windowOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2("hello", 1));
    }

    @Test
    public void testProcessProcessingTime() throws Exception {
        WindowOperator operator = StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Tuple2[]{Tuple2.of("hello", 1), Tuple2.of("hello", 2)}).windowAll(TumblingProcessingTimeWindows.of(Time.of(1L, TimeUnit.SECONDS))).process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.AllWindowTranslationTest.11
            private static final long serialVersionUID = 1;

            public void process(ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>.Context context, Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
                Iterator<Tuple2<String, Integer>> it = iterable.iterator();
                while (it.hasNext()) {
                    collector.collect(it.next());
                }
            }
        }).getTransformation().getOperator();
        Assert.assertTrue(operator instanceof WindowOperator);
        WindowOperator windowOperator = operator;
        Assert.assertTrue(windowOperator.getTrigger() instanceof ProcessingTimeTrigger);
        Assert.assertTrue(windowOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
        Assert.assertTrue(windowOperator.getStateDescriptor() instanceof ListStateDescriptor);
        processElementAndEnsureOutput(windowOperator, windowOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2("hello", 1));
    }

    @Test
    public void testProcessWithEvictor() throws Exception {
        EvictingWindowOperator operator = StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Tuple2[]{Tuple2.of("hello", 1), Tuple2.of("hello", 2)}).windowAll(TumblingEventTimeWindows.of(Time.of(1L, TimeUnit.SECONDS))).trigger(CountTrigger.of(1L)).evictor(TimeEvictor.of(Time.of(100L, TimeUnit.MILLISECONDS))).process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.AllWindowTranslationTest.12
            private static final long serialVersionUID = 1;

            public void process(ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>.Context context, Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
                Iterator<Tuple2<String, Integer>> it = iterable.iterator();
                while (it.hasNext()) {
                    collector.collect(it.next());
                }
            }
        }).getTransformation().getOperator();
        Assert.assertTrue(operator instanceof EvictingWindowOperator);
        EvictingWindowOperator evictingWindowOperator = operator;
        Assert.assertTrue(evictingWindowOperator.getTrigger() instanceof CountTrigger);
        Assert.assertTrue(evictingWindowOperator.getEvictor() instanceof TimeEvictor);
        Assert.assertTrue(evictingWindowOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
        Assert.assertTrue(evictingWindowOperator.getStateDescriptor() instanceof ListStateDescriptor);
        processElementAndEnsureOutput(evictingWindowOperator, evictingWindowOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2("hello", 1));
    }

    @Test
    public void testProcessWithCustomTrigger() throws Exception {
        WindowOperator operator = StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Tuple2[]{Tuple2.of("hello", 1), Tuple2.of("hello", 2)}).windowAll(TumblingEventTimeWindows.of(Time.of(1L, TimeUnit.SECONDS))).trigger(CountTrigger.of(1L)).process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.AllWindowTranslationTest.13
            private static final long serialVersionUID = 1;

            public void process(ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>.Context context, Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
                Iterator<Tuple2<String, Integer>> it = iterable.iterator();
                while (it.hasNext()) {
                    collector.collect(it.next());
                }
            }
        }).getTransformation().getOperator();
        Assert.assertTrue(operator instanceof WindowOperator);
        WindowOperator windowOperator = operator;
        Assert.assertTrue(windowOperator.getTrigger() instanceof CountTrigger);
        Assert.assertTrue(windowOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
        Assert.assertTrue(windowOperator.getStateDescriptor() instanceof ListStateDescriptor);
        processElementAndEnsureOutput(windowOperator, windowOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2("hello", 1));
    }

    @Test
    public void testApplyEventTime() throws Exception {
        WindowOperator operator = StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Tuple2[]{Tuple2.of("hello", 1), Tuple2.of("hello", 2)}).windowAll(TumblingEventTimeWindows.of(Time.of(1L, TimeUnit.SECONDS))).apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.AllWindowTranslationTest.14
            private static final long serialVersionUID = 1;

            public void apply(TimeWindow timeWindow, Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
                Iterator<Tuple2<String, Integer>> it = iterable.iterator();
                while (it.hasNext()) {
                    collector.collect(it.next());
                }
            }

            public /* bridge */ /* synthetic */ void apply(Window window, Iterable iterable, Collector collector) throws Exception {
                apply((TimeWindow) window, (Iterable<Tuple2<String, Integer>>) iterable, (Collector<Tuple2<String, Integer>>) collector);
            }
        }).getTransformation().getOperator();
        Assert.assertTrue(operator instanceof WindowOperator);
        WindowOperator windowOperator = operator;
        Assert.assertTrue(windowOperator.getTrigger() instanceof EventTimeTrigger);
        Assert.assertTrue(windowOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
        Assert.assertTrue(windowOperator.getStateDescriptor() instanceof ListStateDescriptor);
        processElementAndEnsureOutput(windowOperator, windowOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2("hello", 1));
    }

    @Test
    public void testApplyProcessingTimeTime() throws Exception {
        WindowOperator operator = StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Tuple2[]{Tuple2.of("hello", 1), Tuple2.of("hello", 2)}).windowAll(TumblingProcessingTimeWindows.of(Time.of(1L, TimeUnit.SECONDS))).apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.AllWindowTranslationTest.15
            private static final long serialVersionUID = 1;

            public void apply(TimeWindow timeWindow, Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
                Iterator<Tuple2<String, Integer>> it = iterable.iterator();
                while (it.hasNext()) {
                    collector.collect(it.next());
                }
            }

            public /* bridge */ /* synthetic */ void apply(Window window, Iterable iterable, Collector collector) throws Exception {
                apply((TimeWindow) window, (Iterable<Tuple2<String, Integer>>) iterable, (Collector<Tuple2<String, Integer>>) collector);
            }
        }).getTransformation().getOperator();
        Assert.assertTrue(operator instanceof WindowOperator);
        WindowOperator windowOperator = operator;
        Assert.assertTrue(windowOperator.getTrigger() instanceof ProcessingTimeTrigger);
        Assert.assertTrue(windowOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
        Assert.assertTrue(windowOperator.getStateDescriptor() instanceof ListStateDescriptor);
        processElementAndEnsureOutput(windowOperator, windowOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2("hello", 1));
    }

    @Test
    public void testReduceWithCustomTrigger() throws Exception {
        WindowOperator operator = StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Tuple2[]{Tuple2.of("hello", 1), Tuple2.of("hello", 2)}).windowAll(SlidingEventTimeWindows.of(Time.of(1L, TimeUnit.SECONDS), Time.of(100L, TimeUnit.MILLISECONDS))).trigger(CountTrigger.of(1L)).reduce(new DummyReducer()).getTransformation().getOperator();
        Assert.assertTrue(operator instanceof WindowOperator);
        WindowOperator windowOperator = operator;
        Assert.assertTrue(windowOperator.getTrigger() instanceof CountTrigger);
        Assert.assertTrue(windowOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
        Assert.assertTrue(windowOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
        processElementAndEnsureOutput(windowOperator, windowOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2("hello", 1));
    }

    @Test
    public void testApplyWithCustomTrigger() throws Exception {
        WindowOperator operator = StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Tuple2[]{Tuple2.of("hello", 1), Tuple2.of("hello", 2)}).windowAll(TumblingEventTimeWindows.of(Time.of(1L, TimeUnit.SECONDS))).trigger(CountTrigger.of(1L)).apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.AllWindowTranslationTest.16
            private static final long serialVersionUID = 1;

            public void apply(TimeWindow timeWindow, Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
                Iterator<Tuple2<String, Integer>> it = iterable.iterator();
                while (it.hasNext()) {
                    collector.collect(it.next());
                }
            }

            public /* bridge */ /* synthetic */ void apply(Window window, Iterable iterable, Collector collector) throws Exception {
                apply((TimeWindow) window, (Iterable<Tuple2<String, Integer>>) iterable, (Collector<Tuple2<String, Integer>>) collector);
            }
        }).getTransformation().getOperator();
        Assert.assertTrue(operator instanceof WindowOperator);
        WindowOperator windowOperator = operator;
        Assert.assertTrue(windowOperator.getTrigger() instanceof CountTrigger);
        Assert.assertTrue(windowOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
        Assert.assertTrue(windowOperator.getStateDescriptor() instanceof ListStateDescriptor);
        processElementAndEnsureOutput(windowOperator, windowOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2("hello", 1));
    }

    @Test
    public void testReduceWithEvictor() throws Exception {
        EvictingWindowOperator operator = StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Tuple2[]{Tuple2.of("hello", 1), Tuple2.of("hello", 2)}).windowAll(SlidingEventTimeWindows.of(Time.of(1L, TimeUnit.SECONDS), Time.of(100L, TimeUnit.MILLISECONDS))).evictor(CountEvictor.of(100L)).reduce(new DummyReducer()).getTransformation().getOperator();
        Assert.assertTrue(operator instanceof EvictingWindowOperator);
        EvictingWindowOperator evictingWindowOperator = operator;
        Assert.assertTrue(evictingWindowOperator.getTrigger() instanceof EventTimeTrigger);
        Assert.assertTrue(evictingWindowOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
        Assert.assertTrue(evictingWindowOperator.getEvictor() instanceof CountEvictor);
        Assert.assertTrue(evictingWindowOperator.getStateDescriptor() instanceof ListStateDescriptor);
        processElementAndEnsureOutput(evictingWindowOperator, evictingWindowOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2("hello", 1));
    }

    @Test
    public void testApplyWithEvictor() throws Exception {
        EvictingWindowOperator operator = StreamExecutionEnvironment.getExecutionEnvironment().fromData(new Tuple2[]{Tuple2.of("hello", 1), Tuple2.of("hello", 2)}).windowAll(TumblingEventTimeWindows.of(Time.of(1L, TimeUnit.SECONDS))).trigger(CountTrigger.of(1L)).evictor(TimeEvictor.of(Time.of(100L, TimeUnit.MILLISECONDS))).apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.AllWindowTranslationTest.17
            private static final long serialVersionUID = 1;

            public void apply(TimeWindow timeWindow, Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
                Iterator<Tuple2<String, Integer>> it = iterable.iterator();
                while (it.hasNext()) {
                    collector.collect(it.next());
                }
            }

            public /* bridge */ /* synthetic */ void apply(Window window, Iterable iterable, Collector collector) throws Exception {
                apply((TimeWindow) window, (Iterable<Tuple2<String, Integer>>) iterable, (Collector<Tuple2<String, Integer>>) collector);
            }
        }).getTransformation().getOperator();
        Assert.assertTrue(operator instanceof EvictingWindowOperator);
        EvictingWindowOperator evictingWindowOperator = operator;
        Assert.assertTrue(evictingWindowOperator.getTrigger() instanceof CountTrigger);
        Assert.assertTrue(evictingWindowOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
        Assert.assertTrue(evictingWindowOperator.getEvictor() instanceof TimeEvictor);
        Assert.assertTrue(evictingWindowOperator.getStateDescriptor() instanceof ListStateDescriptor);
        processElementAndEnsureOutput(evictingWindowOperator, evictingWindowOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2("hello", 1));
    }

    private static <K, IN, OUT> void processElementAndEnsureOutput(OneInputStreamOperator<IN, OUT> oneInputStreamOperator, KeySelector<IN, K> keySelector, TypeInformation<K> typeInformation, IN in) throws Exception {
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(oneInputStreamOperator, keySelector, typeInformation);
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(0L);
        keyedOneInputStreamOperatorTestHarness.processWatermark(Long.MIN_VALUE);
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord<>(in, 0L));
        keyedOneInputStreamOperatorTestHarness.setProcessingTime(Long.MAX_VALUE);
        keyedOneInputStreamOperatorTestHarness.processWatermark(Long.MAX_VALUE);
        Assert.assertTrue(keyedOneInputStreamOperatorTestHarness.getOutput().size() >= 3);
        keyedOneInputStreamOperatorTestHarness.close();
    }
}
