package org.apache.flink.streaming.api.operators;

import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamGroupedReduceOperatorTest.class */
public class StreamGroupedReduceOperatorTest {
    private static TypeInformation<Integer> typeInfo = BasicTypeInfo.INT_TYPE_INFO;

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamGroupedReduceOperatorTest$IntegerKeySelector.class */
    private static class IntegerKeySelector implements KeySelector<Integer, Integer> {
        private static final long serialVersionUID = 1;

        private IntegerKeySelector() {
        }

        public Integer getKey(Integer num) throws Exception {
            return num;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamGroupedReduceOperatorTest$MyReducer.class */
    private static class MyReducer implements ReduceFunction<Integer> {
        private static final long serialVersionUID = 1;

        private MyReducer() {
        }

        public Integer reduce(Integer num, Integer num2) throws Exception {
            return Integer.valueOf(num.intValue() + num2.intValue());
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamGroupedReduceOperatorTest$TestOpenCloseReduceFunction.class */
    private static class TestOpenCloseReduceFunction extends RichReduceFunction<Integer> {
        private static final long serialVersionUID = 1;
        public static boolean openCalled = false;
        public static boolean closeCalled = false;

        private TestOpenCloseReduceFunction() {
        }

        public void open(OpenContext openContext) throws Exception {
            super.open(openContext);
            if (closeCalled) {
                Assert.fail("Close called before open.");
            }
            openCalled = true;
        }

        public void close() throws Exception {
            super.close();
            if (!openCalled) {
                Assert.fail("Open was not called before close.");
            }
            closeCalled = true;
        }

        public Integer reduce(Integer num, Integer num2) throws Exception {
            if (!openCalled) {
                Assert.fail("Open was not called before run.");
            }
            return Integer.valueOf(num.intValue() + num2.intValue());
        }
    }

    @Test
    public void testGroupedReduce() throws Exception {
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator) new StreamGroupedReduceOperator(new MyReducer(), IntSerializer.INSTANCE), (KeySelector) new IntegerKeySelector(), (TypeInformation) BasicTypeInfo.INT_TYPE_INFO);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(1, 0 + 1));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(1, 0 + 2));
        keyedOneInputStreamOperatorTestHarness.processWatermark(new Watermark(0 + 2));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(2, 0 + 3));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(2, 0 + 4));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(3, 0 + 5));
        concurrentLinkedQueue.add(new StreamRecord(1, 0 + 1));
        concurrentLinkedQueue.add(new StreamRecord(2, 0 + 2));
        concurrentLinkedQueue.add(new Watermark(0 + 2));
        concurrentLinkedQueue.add(new StreamRecord(2, 0 + 3));
        concurrentLinkedQueue.add(new StreamRecord(4, 0 + 4));
        concurrentLinkedQueue.add(new StreamRecord(3, 0 + 5));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, keyedOneInputStreamOperatorTestHarness.getOutput());
    }

    @Test
    public void testOpenClose() throws Exception {
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator) new StreamGroupedReduceOperator(new TestOpenCloseReduceFunction(), IntSerializer.INSTANCE), (KeySelector) new IntegerKeySelector(), (TypeInformation) BasicTypeInfo.INT_TYPE_INFO);
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(1, 0L));
        keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(2, 0L));
        keyedOneInputStreamOperatorTestHarness.close();
        Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseReduceFunction.closeCalled);
        Assert.assertTrue("Output contains no elements.", keyedOneInputStreamOperatorTestHarness.getOutput().size() > 0);
    }
}
