/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.operators.AbstractInput;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorTest;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributesBuilder;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.KeyedMultiInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class AbstractStreamOperatorV2Test
extends AbstractStreamOperatorTest {
    AbstractStreamOperatorV2Test() {
    }

    protected KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness(int maxParalelism, int numSubtasks, int subtaskIndex) throws Exception {
        return new KeyedOneInputStreamOperatorTestHarness((StreamOperatorFactory)new TestOperatorFactory(), (KeySelector)new AbstractStreamOperatorTest.TestKeySelector(), (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, maxParalelism, numSubtasks, subtaskIndex);
    }

    @Test
    void testIdleWatermarkHandling() throws Exception {
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        KeySelector & Serializable dummyKeySelector = (KeySelector & Serializable)l -> 0;
        try (KeyedMultiInputStreamOperatorTestHarness testHarness = new KeyedMultiInputStreamOperatorTestHarness((StreamOperatorFactory)new WatermarkTestingOperatorFactory(), (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);){
            testHarness.setKeySelector(0, (KeySelector)dummyKeySelector);
            testHarness.setKeySelector(1, (KeySelector)dummyKeySelector);
            testHarness.setKeySelector(2, (KeySelector)dummyKeySelector);
            testHarness.setup();
            testHarness.open();
            testHarness.processElement(0, new StreamRecord((Object)1L, 1L));
            testHarness.processElement(0, new StreamRecord((Object)3L, 3L));
            testHarness.processElement(0, new StreamRecord((Object)4L, 4L));
            testHarness.processWatermark(0, new Watermark(1L));
            Assertions.assertThat((Collection)testHarness.getOutput()).isEmpty();
            testHarness.processWatermarkStatus(1, WatermarkStatus.IDLE);
            TestHarnessUtil.assertOutputEquals((String)"Output was not correct", expectedOutput, (Queue)testHarness.getOutput());
            testHarness.processWatermarkStatus(2, WatermarkStatus.IDLE);
            expectedOutput.add(new StreamRecord((Object)1L));
            expectedOutput.add(new Watermark(1L));
            TestHarnessUtil.assertOutputEquals((String)"Output was not correct", expectedOutput, (Queue)testHarness.getOutput());
            testHarness.processWatermark(0, new Watermark(3L));
            expectedOutput.add(new StreamRecord((Object)3L));
            expectedOutput.add(new Watermark(3L));
            TestHarnessUtil.assertOutputEquals((String)"Output was not correct", expectedOutput, (Queue)testHarness.getOutput());
            testHarness.processWatermarkStatus(1, WatermarkStatus.ACTIVE);
            testHarness.processWatermark(0, new Watermark(4L));
            TestHarnessUtil.assertOutputEquals((String)"Output was not correct", expectedOutput, (Queue)testHarness.getOutput());
        }
    }

    @Test
    void testIdlenessForwarding() throws Exception {
        ConcurrentLinkedQueue<WatermarkStatus> expectedOutput = new ConcurrentLinkedQueue<WatermarkStatus>();
        try (KeyedMultiInputStreamOperatorTestHarness testHarness = new KeyedMultiInputStreamOperatorTestHarness((StreamOperatorFactory)new WatermarkTestingOperatorFactory(), (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);){
            testHarness.setup();
            testHarness.open();
            testHarness.processWatermarkStatus(0, WatermarkStatus.IDLE);
            testHarness.processWatermarkStatus(1, WatermarkStatus.IDLE);
            TestHarnessUtil.assertOutputEquals((String)"Output was not correct", expectedOutput, (Queue)testHarness.getOutput());
            testHarness.processWatermarkStatus(2, WatermarkStatus.IDLE);
            expectedOutput.add(WatermarkStatus.IDLE);
            TestHarnessUtil.assertOutputEquals((String)"Output was not correct", expectedOutput, (Queue)testHarness.getOutput());
        }
    }

    @Test
    void testRecordAttributesForwarding() throws Exception {
        ConcurrentLinkedQueue<RecordAttributes> expectedOutput = new ConcurrentLinkedQueue<RecordAttributes>();
        try (KeyedMultiInputStreamOperatorTestHarness testHarness = new KeyedMultiInputStreamOperatorTestHarness((StreamOperatorFactory)new WatermarkTestingOperatorFactory(), (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);){
            testHarness.setup();
            testHarness.open();
            RecordAttributes backlogRecordAttributes = new RecordAttributesBuilder(Collections.emptyList()).setBacklog(true).build();
            RecordAttributes nonBacklogRecordAttributes = new RecordAttributesBuilder(Collections.emptyList()).setBacklog(false).build();
            testHarness.processRecordAttributes(0, backlogRecordAttributes);
            testHarness.processRecordAttributes(1, backlogRecordAttributes);
            testHarness.processRecordAttributes(2, backlogRecordAttributes);
            expectedOutput.add(backlogRecordAttributes);
            expectedOutput.add(backlogRecordAttributes);
            expectedOutput.add(backlogRecordAttributes);
            testHarness.processRecordAttributes(0, nonBacklogRecordAttributes);
            testHarness.processRecordAttributes(1, nonBacklogRecordAttributes);
            testHarness.processRecordAttributes(2, nonBacklogRecordAttributes);
            expectedOutput.add(backlogRecordAttributes);
            expectedOutput.add(backlogRecordAttributes);
            expectedOutput.add(nonBacklogRecordAttributes);
            TestHarnessUtil.assertOutputEquals((String)"Output was not correct", expectedOutput, (Queue)testHarness.getOutput());
        }
    }

    private static class SingleInputTestOperator
    extends AbstractStreamOperatorV2<String>
    implements MultipleInputStreamOperator<String>,
    Triggerable<Integer, VoidNamespace> {
        private static final long serialVersionUID = 1L;
        private transient InternalTimerService<VoidNamespace> timerService;
        private final ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor("state", (TypeSerializer)StringSerializer.INSTANCE);

        public SingleInputTestOperator(StreamOperatorParameters<String> parameters) {
            super(parameters, 1);
        }

        public void open() throws Exception {
            super.open();
            this.timerService = this.getInternalTimerService("test-timers", (TypeSerializer)VoidNamespaceSerializer.INSTANCE, this);
        }

        public List<Input> getInputs() {
            return Collections.singletonList(new AbstractInput<Tuple2<Integer, String>, String>((AbstractStreamOperatorV2)this, 1){

                public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws Exception {
                    String[] command = ((String)((Tuple2)element.getValue()).f1).split(":");
                    switch (command[0]) {
                        case "SET_STATE": {
                            ((ValueState)this.getPartitionedState((StateDescriptor)stateDescriptor)).update((Object)command[1]);
                            break;
                        }
                        case "DELETE_STATE": {
                            ((ValueState)this.getPartitionedState((StateDescriptor)stateDescriptor)).clear();
                            break;
                        }
                        case "SET_EVENT_TIME_TIMER": {
                            timerService.registerEventTimeTimer((Object)VoidNamespace.INSTANCE, Long.parseLong(command[1]));
                            break;
                        }
                        case "SET_PROC_TIME_TIMER": {
                            timerService.registerProcessingTimeTimer((Object)VoidNamespace.INSTANCE, Long.parseLong(command[1]));
                            break;
                        }
                        case "EMIT_STATE": {
                            String stateValue = (String)((ValueState)this.getPartitionedState((StateDescriptor)stateDescriptor)).value();
                            this.output.collect((Object)new StreamRecord((Object)("ON_ELEMENT:" + ((Tuple2)element.getValue()).f0 + ":" + stateValue)));
                            break;
                        }
                        default: {
                            throw new IllegalArgumentException();
                        }
                    }
                }
            });
        }

        public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
            String stateValue = (String)((ValueState)this.getPartitionedState((StateDescriptor)this.stateDescriptor)).value();
            this.output.collect((Object)new StreamRecord((Object)("ON_EVENT_TIME:" + stateValue)));
        }

        public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
            String stateValue = (String)((ValueState)this.getPartitionedState((StateDescriptor)this.stateDescriptor)).value();
            this.output.collect((Object)new StreamRecord((Object)("ON_PROC_TIME:" + stateValue)));
        }
    }

    private static class WatermarkTestingOperator
    extends AbstractStreamOperatorV2<Long>
    implements MultipleInputStreamOperator<Long>,
    Triggerable<Integer, VoidNamespace> {
        private transient InternalTimerService<VoidNamespace> timerService;

        public WatermarkTestingOperator(StreamOperatorParameters<Long> parameters) {
            super(parameters, 3);
        }

        public void open() throws Exception {
            super.open();
            this.timerService = this.getInternalTimerService("test-timers", (TypeSerializer)VoidNamespaceSerializer.INSTANCE, this);
        }

        public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
            this.output.collect((Object)new StreamRecord((Object)timer.getTimestamp()));
        }

        public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
        }

        private Input<Long> createInput(int idx) {
            return new AbstractInput<Long, Long>((AbstractStreamOperatorV2)this, idx){

                public void processElement(StreamRecord<Long> element) throws Exception {
                    timerService.registerEventTimeTimer((Object)VoidNamespace.INSTANCE, ((Long)element.getValue()).longValue());
                }
            };
        }

        public List<Input> getInputs() {
            return Arrays.asList(this.createInput(1), this.createInput(2), this.createInput(3));
        }
    }

    private static class WatermarkTestingOperatorFactory
    extends AbstractStreamOperatorFactory<Long> {
        private WatermarkTestingOperatorFactory() {
        }

        public <T extends StreamOperator<Long>> T createStreamOperator(StreamOperatorParameters<Long> parameters) {
            return (T)((Object)new WatermarkTestingOperator(parameters));
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return WatermarkTestingOperator.class;
        }
    }

    private static class TestOperatorFactory
    extends AbstractStreamOperatorFactory<String> {
        private TestOperatorFactory() {
        }

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> parameters) {
            return (T)((Object)new SingleInputTestOperator(parameters));
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return SingleInputTestOperator.class;
        }
    }
}

