/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;
import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperatorTest;
import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperatorV2;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendTestUtils;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.operators.AbstractInput;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.InternalTimerServiceAsyncImpl;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.ElementOrder;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributesBuilder;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedMultiInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.BiFunctionWithException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class AbstractAsyncStateStreamOperatorV2Test {
    AbstractAsyncStateStreamOperatorV2Test() {
    }

    protected KeyedOneInputStreamOperatorV2TestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness(int maxParalelism, int numSubtasks, int subtaskIndex, ElementOrder elementOrder) throws Exception {
        KeyedOneInputStreamOperatorV2TestHarness<Integer, Tuple2<Integer, String>, String> testHarness = KeyedOneInputStreamOperatorV2TestHarness.create(new TestOperatorFactory(elementOrder), new AbstractAsyncStateStreamOperatorTest.TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO, maxParalelism, numSubtasks, subtaskIndex);
        testHarness.setStateBackend(StateBackendTestUtils.buildAsyncStateBackend((StateBackend)new HashMapStateBackend()));
        return testHarness;
    }

    @Test
    void testCreateAsyncExecutionController() throws Exception {
        try (KeyedOneInputStreamOperatorV2TestHarness<Integer, Tuple2<Integer, String>, String> testHarness = this.createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER);){
            testHarness.open();
            Assertions.assertThat(testHarness.getBaseOperator()).isInstanceOf(AbstractAsyncStateStreamOperatorV2.class);
            Assertions.assertThat((Object)((AbstractAsyncStateStreamOperatorV2)testHarness.getBaseOperator()).getAsyncExecutionController()).isNotNull();
            Assertions.assertThat((Object)((AbstractAsyncStateStreamOperatorV2)testHarness.getBaseOperator()).getAsyncExecutionController().getStateExecutor()).isNotNull();
        }
    }

    @Test
    void testRecordProcessorWithFirstStateOrder() throws Exception {
        try (KeyedOneInputStreamOperatorV2TestHarness<Integer, Tuple2<Integer, String>, String> testHarness = this.createTestHarness(128, 1, 0, ElementOrder.FIRST_STATE_ORDER);){
            testHarness.open();
            SingleInputTestOperator testOperator = (SingleInputTestOperator)testHarness.getBaseOperator();
            CompletableFuture future = testHarness.processElementInternal(new StreamRecord((Object)Tuple2.of((Object)5, (Object)"5")));
            Thread.sleep(1000L);
            Assertions.assertThat((int)testOperator.getProcessed()).isEqualTo(1);
            Assertions.assertThat((int)testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(1);
            testOperator.proceed();
            future.get();
            Assertions.assertThat((int)testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
        }
    }

    @Test
    void testRecordProcessorWithRecordOrder() throws Exception {
        try (KeyedOneInputStreamOperatorV2TestHarness<Integer, Tuple2<Integer, String>, String> testHarness = this.createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER);){
            testHarness.open();
            SingleInputTestOperator testOperator = (SingleInputTestOperator)testHarness.getBaseOperator();
            CompletableFuture future = testHarness.processElementInternal(new StreamRecord((Object)Tuple2.of((Object)5, (Object)"5")));
            Thread.sleep(1000L);
            Assertions.assertThat((int)testOperator.getProcessed()).isEqualTo(1);
            Assertions.assertThat((int)testOperator.getCurrentProcessingContext().getReferenceCount()).isGreaterThan(1);
            testOperator.proceed();
            future.get();
            Assertions.assertThat((int)testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testAsyncProcessWithKey() throws Exception {
        testHarness.setStateBackend(StateBackendTestUtils.buildAsyncStateBackend((StateBackend)new HashMapStateBackend()));
        try (KeyedOneInputStreamOperatorV2TestHarness testHarness = KeyedOneInputStreamOperatorV2TestHarness.create(new TestDirectAsyncProcessOperatorFactory(ElementOrder.RECORD_ORDER), new AbstractAsyncStateStreamOperatorTest.TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO, 128, 1, 0);){
            testHarness.open();
            SingleInputTestOperatorDirectAsyncProcess testOperator = (SingleInputTestOperatorDirectAsyncProcess)testHarness.getBaseOperator();
            CompletableFuture future = testHarness.processElementInternal(new StreamRecord((Object)Tuple2.of((Object)5, (Object)"5")));
            Thread.sleep(1000L);
            Assertions.assertThat((int)testOperator.getProcessed()).isEqualTo(0);
            Assertions.assertThat((int)testOperator.getCurrentProcessingContext().getReferenceCount()).isGreaterThan(1);
            testOperator.proceed();
            future.get();
            Assertions.assertThat((int)testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
            Assertions.assertThat((int)testOperator.getProcessed()).isEqualTo(1);
        }
    }

    @Test
    void testCheckpointDrain() throws Exception {
        try (KeyedOneInputStreamOperatorV2TestHarness<Integer, Tuple2<Integer, String>, String> testHarness = this.createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER);){
            testHarness.open();
            SingleInputTestOperator testOperator = (SingleInputTestOperator)testHarness.getBaseOperator();
            AsyncExecutionController asyncExecutionController = testOperator.getAsyncExecutionController();
            testOperator.setAsyncKeyedContextElement(new StreamRecord((Object)Tuple2.of((Object)5, (Object)"5")), (KeySelector)new AbstractAsyncStateStreamOperatorTest.TestKeySelector());
            asyncExecutionController.handleRequest(null, StateRequestType.VALUE_GET, null);
            testOperator.postProcessElement();
            Assertions.assertThat((int)asyncExecutionController.getInFlightRecordNum()).isEqualTo(1);
            testHarness.drainStateRequests();
            Assertions.assertThat((int)asyncExecutionController.getInFlightRecordNum()).isEqualTo(0);
        }
    }

    @Test
    void testTimerServiceIsAsync() throws Exception {
        try (KeyedOneInputStreamOperatorV2TestHarness<Integer, Tuple2<Integer, String>, String> testHarness = this.createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER);){
            testHarness.open();
            Assertions.assertThat(testHarness.getBaseOperator()).isInstanceOf(AbstractAsyncStateStreamOperatorV2.class);
            Triggerable triggerable = new Triggerable(){

                public void onEventTime(InternalTimer timer) throws Exception {
                }

                public void onProcessingTime(InternalTimer timer) throws Exception {
                }
            };
            Assertions.assertThat((Object)((AbstractAsyncStateStreamOperatorV2)testHarness.getBaseOperator()).getInternalTimerService("test", (TypeSerializer)VoidNamespaceSerializer.INSTANCE, triggerable)).isInstanceOf(InternalTimerServiceAsyncImpl.class);
        }
    }

    @Test
    void testNonRecordProcess() throws Exception {
        try (KeyedOneInputStreamOperatorV2TestHarness<Integer, Tuple2<Integer, String>, String> testHarness = this.createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER);){
            testHarness.open();
            SingleInputTestOperator testOperator = (SingleInputTestOperator)testHarness.getBaseOperator();
            testHarness.processElementInternal(new StreamRecord((Object)Tuple2.of((Object)5, (Object)"5")));
            CompletableFuture future = testHarness.processRecordAttributesInternal(new RecordAttributes(false));
            Thread.sleep(1000L);
            Assertions.assertThat((int)testOperator.getProcessed()).isEqualTo(1);
            Assertions.assertThat((int)testOperator.getAttributeProcessed()).isEqualTo(0);
            testOperator.proceed();
            future.get();
            Assertions.assertThat((int)testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
            Assertions.assertThat((int)testOperator.getAttributeProcessed()).isEqualTo(1);
        }
    }

    @Test
    void testWatermark() throws Exception {
        testHarness.setStateBackend(StateBackendTestUtils.buildAsyncStateBackend((StateBackend)new HashMapStateBackend()));
        try (KeyedOneInputStreamOperatorV2TestHarness testHarness = KeyedOneInputStreamOperatorV2TestHarness.create(new TestWithAsyncProcessTimerOperatorFactory(ElementOrder.RECORD_ORDER), new AbstractAsyncStateStreamOperatorTest.TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO, 128, 1, 0);){
            testHarness.open();
            ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
            testHarness.processElementInternal(new StreamRecord((Object)Tuple2.of((Object)1, (Object)"1")));
            expectedOutput.add(new StreamRecord((Object)"EventTimer-1-1"));
            testHarness.processElementInternal(new StreamRecord((Object)Tuple2.of((Object)1, (Object)"3")));
            expectedOutput.add(new StreamRecord((Object)"EventTimer-1-3"));
            testHarness.processElementInternal(new StreamRecord((Object)Tuple2.of((Object)1, (Object)"6")));
            expectedOutput.add(new StreamRecord((Object)"EventTimer-1-6"));
            testHarness.processElementInternal(new StreamRecord((Object)Tuple2.of((Object)1, (Object)"9")));
            expectedOutput.add(new StreamRecord((Object)"EventTimer-1-9"));
            testHarness.processWatermark(10L);
            expectedOutput.add(new Watermark(10L));
            TestHarnessUtil.assertOutputEquals((String)"Output was not correct", expectedOutput, (Queue)testHarness.getOutput());
        }
    }

    @Test
    void testWatermarkHooks() throws Exception {
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        KeySelector & Serializable dummyKeySelector = (KeySelector & Serializable)l -> 0;
        List<KeySelector> keySelectors = Arrays.asList(dummyKeySelector, dummyKeySelector, dummyKeySelector);
        WatermarkTestingOperatorFactory factory = new WatermarkTestingOperatorFactory();
        AtomicInteger counter = new AtomicInteger(0);
        factory.setPreProcessFunction((BiFunctionWithException<WatermarkTestingOperator, Watermark, Watermark, Exception>)((BiFunctionWithException)(operator, watermark) -> {
            operator.asyncProcessWithKey(1L, () -> {
                Assertions.assertThat((Object)operator.getCurrentKey()).isEqualTo((Object)1L);
                operator.output(watermark.getTimestamp() + 1000L);
            });
            if (counter.incrementAndGet() % 2 == 0) {
                return null;
            }
            return new Watermark(watermark.getTimestamp() + 1L);
        }));
        factory.setPostProcessFunction((BiConsumerWithException<WatermarkTestingOperator, Watermark, Exception>)((BiConsumerWithException)(operator, watermark) -> operator.output(watermark.getTimestamp() + 100L)));
        try (AsyncKeyedMultiInputStreamOperatorTestHarness testHarness = AsyncKeyedMultiInputStreamOperatorTestHarness.create((StreamOperatorFactory)factory, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, keySelectors, (int)1, (int)1, (int)0);){
            testHarness.setup();
            testHarness.open();
            testHarness.processElement(0, new StreamRecord((Object)1L, 1L));
            testHarness.processElement(0, new StreamRecord((Object)3L, 3L));
            testHarness.processElement(0, new StreamRecord((Object)4L, 4L));
            testHarness.processWatermark(0, new Watermark(2L));
            testHarness.processWatermark(1, new Watermark(2L));
            testHarness.processWatermark(2, new Watermark(2L));
            expectedOutput.add(new StreamRecord((Object)1002L));
            expectedOutput.add(new StreamRecord((Object)1L));
            expectedOutput.add(new StreamRecord((Object)3L));
            expectedOutput.add(new Watermark(3L));
            expectedOutput.add(new StreamRecord((Object)103L));
            testHarness.processWatermark(0, new Watermark(4L));
            testHarness.processWatermark(1, new Watermark(4L));
            testHarness.processWatermark(2, new Watermark(4L));
            expectedOutput.add(new StreamRecord((Object)1004L));
            testHarness.processWatermark(0, new Watermark(5L));
            testHarness.processWatermark(1, new Watermark(5L));
            testHarness.processWatermark(2, new Watermark(5L));
            expectedOutput.add(new StreamRecord((Object)1005L));
            expectedOutput.add(new StreamRecord((Object)4L));
            expectedOutput.add(new Watermark(6L));
            expectedOutput.add(new StreamRecord((Object)106L));
            TestHarnessUtil.assertOutputEquals((String)"Output was not correct", expectedOutput, (Queue)testHarness.getOutput());
        }
    }

    @Test
    void testWatermarkStatus() throws Exception {
        try (KeyedOneInputStreamOperatorV2TestHarness<Integer, Tuple2<Integer, String>, String> testHarness = this.createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER);){
            testHarness.open();
            SingleInputTestOperator testOperator = (SingleInputTestOperator)testHarness.getBaseOperator();
            testHarness.processElementInternal(new StreamRecord((Object)Tuple2.of((Object)5, (Object)"5")));
            testHarness.processWatermarkInternal(new Watermark(205L));
            CompletableFuture future = testHarness.processWatermarkStatusInternal(WatermarkStatus.IDLE);
            Thread.sleep(1000L);
            Assertions.assertThat((int)testOperator.getProcessed()).isEqualTo(1);
            Assertions.assertThat((int)testOperator.watermarkIndex).isEqualTo(-1);
            Assertions.assertThat((boolean)testOperator.watermarkStatus.isIdle()).isTrue();
            testOperator.proceed();
            future.get();
            Assertions.assertThat((int)testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
            Assertions.assertThat((boolean)testOperator.watermarkStatus.isActive()).isFalse();
            Assertions.assertThat((Collection)testHarness.getOutput()).containsExactly(new Object[]{new StreamRecord((Object)"EventTimer-5-105"), new Watermark(205L), WatermarkStatus.IDLE});
        }
    }

    @Test
    void testIdleWatermarkHandling() throws Exception {
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        KeySelector & Serializable dummyKeySelector = (KeySelector & Serializable)l -> 0;
        List<KeySelector> keySelectors = Arrays.asList(dummyKeySelector, dummyKeySelector, dummyKeySelector);
        try (AsyncKeyedMultiInputStreamOperatorTestHarness testHarness = AsyncKeyedMultiInputStreamOperatorTestHarness.create((StreamOperatorFactory)new WatermarkTestingOperatorFactory(), (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, keySelectors, (int)1, (int)1, (int)0);){
            testHarness.setup();
            testHarness.open();
            testHarness.processElement(0, new StreamRecord((Object)1L, 1L));
            testHarness.processElement(0, new StreamRecord((Object)3L, 3L));
            testHarness.processElement(0, new StreamRecord((Object)4L, 4L));
            testHarness.processWatermark(0, new Watermark(1L));
            Assertions.assertThat((Collection)testHarness.getOutput()).isEmpty();
            testHarness.processWatermarkStatus(1, WatermarkStatus.IDLE);
            TestHarnessUtil.assertOutputEquals((String)"Output was not correct", expectedOutput, (Queue)testHarness.getOutput());
            testHarness.processWatermarkStatus(2, WatermarkStatus.IDLE);
            expectedOutput.add(new StreamRecord((Object)1L));
            expectedOutput.add(new Watermark(1L));
            TestHarnessUtil.assertOutputEquals((String)"Output was not correct", expectedOutput, (Queue)testHarness.getOutput());
            testHarness.processWatermark(0, new Watermark(3L));
            expectedOutput.add(new StreamRecord((Object)3L));
            expectedOutput.add(new Watermark(3L));
            TestHarnessUtil.assertOutputEquals((String)"Output was not correct", expectedOutput, (Queue)testHarness.getOutput());
            testHarness.processWatermarkStatus(1, WatermarkStatus.ACTIVE);
            testHarness.processWatermark(0, new Watermark(4L));
            TestHarnessUtil.assertOutputEquals((String)"Output was not correct", expectedOutput, (Queue)testHarness.getOutput());
        }
    }

    @Test
    void testIdlenessForwarding() throws Exception {
        ConcurrentLinkedQueue<WatermarkStatus> expectedOutput = new ConcurrentLinkedQueue<WatermarkStatus>();
        KeySelector & Serializable dummyKeySelector = (KeySelector & Serializable)l -> 0;
        List<KeySelector> keySelectors = Arrays.asList(dummyKeySelector, dummyKeySelector, dummyKeySelector);
        try (AsyncKeyedMultiInputStreamOperatorTestHarness testHarness = AsyncKeyedMultiInputStreamOperatorTestHarness.create((StreamOperatorFactory)new WatermarkTestingOperatorFactory(), (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, keySelectors, (int)1, (int)1, (int)0);){
            testHarness.setup();
            testHarness.open();
            testHarness.processWatermarkStatus(0, WatermarkStatus.IDLE);
            testHarness.processWatermarkStatus(1, WatermarkStatus.IDLE);
            TestHarnessUtil.assertOutputEquals((String)"Output was not correct", expectedOutput, (Queue)testHarness.getOutput());
            testHarness.processWatermarkStatus(2, WatermarkStatus.IDLE);
            expectedOutput.add(WatermarkStatus.IDLE);
            TestHarnessUtil.assertOutputEquals((String)"Output was not correct", expectedOutput, (Queue)testHarness.getOutput());
        }
    }

    @Test
    void testRecordAttributesForwarding() throws Exception {
        ConcurrentLinkedQueue<RecordAttributes> expectedOutput = new ConcurrentLinkedQueue<RecordAttributes>();
        KeySelector & Serializable dummyKeySelector = (KeySelector & Serializable)l -> 0;
        List<KeySelector> keySelectors = Arrays.asList(dummyKeySelector, dummyKeySelector, dummyKeySelector);
        try (AsyncKeyedMultiInputStreamOperatorTestHarness testHarness = AsyncKeyedMultiInputStreamOperatorTestHarness.create((StreamOperatorFactory)new WatermarkTestingOperatorFactory(), (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, keySelectors, (int)1, (int)1, (int)0);){
            testHarness.setup();
            testHarness.open();
            RecordAttributes backlogRecordAttributes = new RecordAttributesBuilder(Collections.emptyList()).setBacklog(true).build();
            RecordAttributes nonBacklogRecordAttributes = new RecordAttributesBuilder(Collections.emptyList()).setBacklog(false).build();
            testHarness.processRecordAttributes(0, backlogRecordAttributes);
            testHarness.processRecordAttributes(1, backlogRecordAttributes);
            testHarness.processRecordAttributes(2, backlogRecordAttributes);
            expectedOutput.add(backlogRecordAttributes);
            expectedOutput.add(backlogRecordAttributes);
            expectedOutput.add(backlogRecordAttributes);
            testHarness.processRecordAttributes(0, nonBacklogRecordAttributes);
            testHarness.processRecordAttributes(1, nonBacklogRecordAttributes);
            testHarness.processRecordAttributes(2, nonBacklogRecordAttributes);
            expectedOutput.add(backlogRecordAttributes);
            expectedOutput.add(backlogRecordAttributes);
            expectedOutput.add(nonBacklogRecordAttributes);
            TestHarnessUtil.assertOutputEquals((String)"Output was not correct", expectedOutput, (Queue)testHarness.getOutput());
        }
    }

    private static class WatermarkTestingOperator
    extends AbstractAsyncStateStreamOperatorV2<Long>
    implements MultipleInputStreamOperator<Long>,
    Triggerable<Integer, VoidNamespace> {
        private transient InternalTimerService<VoidNamespace> timerService;
        private BiFunctionWithException<WatermarkTestingOperator, Watermark, Watermark, Exception> preProcessFunction;
        private BiConsumerWithException<WatermarkTestingOperator, Watermark, Exception> postProcessFunction;

        public WatermarkTestingOperator(StreamOperatorParameters<Long> parameters, BiFunctionWithException<WatermarkTestingOperator, Watermark, Watermark, Exception> preProcessFunction, BiConsumerWithException<WatermarkTestingOperator, Watermark, Exception> postProcessFunction) {
            super(parameters, 3);
            this.preProcessFunction = preProcessFunction;
            this.postProcessFunction = postProcessFunction;
        }

        public void output(Long o) {
            this.output.collect((Object)new StreamRecord((Object)o));
        }

        public void open() throws Exception {
            super.open();
            this.timerService = this.getInternalTimerService("test-timers", (TypeSerializer)VoidNamespaceSerializer.INSTANCE, this);
        }

        public Watermark preProcessWatermark(Watermark watermark) throws Exception {
            return this.preProcessFunction == null ? watermark : (Watermark)this.preProcessFunction.apply((Object)this, (Object)watermark);
        }

        public void postProcessWatermark(Watermark watermark) throws Exception {
            if (this.postProcessFunction != null) {
                this.postProcessFunction.accept((Object)this, (Object)watermark);
            }
        }

        public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
            Assertions.assertThat((Object)this.getCurrentKey()).isEqualTo(timer.getKey());
            this.output.collect((Object)new StreamRecord((Object)timer.getTimestamp()));
        }

        public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
            Assertions.assertThat((Object)this.getCurrentKey()).isEqualTo(timer.getKey());
        }

        private Input<Long> createInput(int idx) {
            return new AbstractInput<Long, Long>((AbstractStreamOperatorV2)this, idx){

                public void processElement(StreamRecord<Long> element) throws Exception {
                    timerService.registerEventTimeTimer((Object)VoidNamespace.INSTANCE, ((Long)element.getValue()).longValue());
                }
            };
        }

        public List<Input> getInputs() {
            return Arrays.asList(this.createInput(1), this.createInput(2), this.createInput(3));
        }
    }

    private static class WatermarkTestingOperatorFactory
    extends AbstractStreamOperatorFactory<Long> {
        private BiFunctionWithException<WatermarkTestingOperator, Watermark, Watermark, Exception> preProcessFunction;
        private BiConsumerWithException<WatermarkTestingOperator, Watermark, Exception> postProcessFunction;

        private WatermarkTestingOperatorFactory() {
        }

        public void setPreProcessFunction(BiFunctionWithException<WatermarkTestingOperator, Watermark, Watermark, Exception> preProcessFunction) {
            this.preProcessFunction = preProcessFunction;
        }

        public void setPostProcessFunction(BiConsumerWithException<WatermarkTestingOperator, Watermark, Exception> postProcessFunction) {
            this.postProcessFunction = postProcessFunction;
        }

        public <T extends StreamOperator<Long>> T createStreamOperator(StreamOperatorParameters<Long> parameters) {
            return (T)((Object)new WatermarkTestingOperator(parameters, this.preProcessFunction, this.postProcessFunction));
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return WatermarkTestingOperator.class;
        }
    }

    private static class SingleInputTestOperatorWithAsyncProcessTimer
    extends SingleInputTestOperator {
        SingleInputTestOperatorWithAsyncProcessTimer(StreamOperatorParameters<String> parameters, ElementOrder elementOrder) {
            super(parameters, elementOrder);
            this.input = new AbstractInput<Tuple2<Integer, String>, String>((AbstractStreamOperatorV2)this, 1){

                public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws Exception {
                    processed.incrementAndGet();
                    timerService.registerEventTimeTimer((Object)VoidNamespace.INSTANCE, Long.parseLong((String)((Tuple2)element.getValue()).f1));
                }
            };
        }

        @Override
        public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
            this.asyncProcessWithKey((Integer)timer.getKey(), () -> super.onEventTime(timer));
        }

        @Override
        public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
            this.asyncProcessWithKey((Integer)timer.getKey(), () -> super.onProcessingTime(timer));
        }
    }

    private static class TestWithAsyncProcessTimerOperatorFactory
    extends AbstractStreamOperatorFactory<String> {
        private final ElementOrder elementOrder;

        TestWithAsyncProcessTimerOperatorFactory(ElementOrder elementOrder) {
            this.elementOrder = elementOrder;
        }

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> parameters) {
            return (T)((Object)new SingleInputTestOperatorWithAsyncProcessTimer(parameters, this.elementOrder));
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return SingleInputTestOperatorWithAsyncProcessTimer.class;
        }
    }

    private static class SingleInputTestOperatorDirectAsyncProcess
    extends SingleInputTestOperator {
        SingleInputTestOperatorDirectAsyncProcess(StreamOperatorParameters<String> parameters, ElementOrder elementOrder) {
            super(parameters, elementOrder);
            this.input = new AbstractInput<Tuple2<Integer, String>, String>((AbstractStreamOperatorV2)this, 1){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws Exception {
                    this.asyncProcessWithKey((Integer)((Tuple2)element.getValue()).f0, () -> processed.incrementAndGet());
                    Object object = objectToWait;
                    synchronized (object) {
                        objectToWait.wait();
                    }
                    processed.incrementAndGet();
                }
            };
        }
    }

    private static class TestDirectAsyncProcessOperatorFactory
    extends AbstractStreamOperatorFactory<String> {
        private final ElementOrder elementOrder;

        TestDirectAsyncProcessOperatorFactory(ElementOrder elementOrder) {
            this.elementOrder = elementOrder;
        }

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> parameters) {
            return (T)((Object)new SingleInputTestOperatorDirectAsyncProcess(parameters, this.elementOrder));
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return SingleInputTestOperatorDirectAsyncProcess.class;
        }
    }

    private static class SingleInputTestOperator
    extends AbstractAsyncStateStreamOperatorV2<String>
    implements MultipleInputStreamOperator<String>,
    Triggerable<Integer, VoidNamespace> {
        private static final long serialVersionUID = 1L;
        final AtomicInteger processed = new AtomicInteger(0);
        final AtomicInteger attributeProcessed = new AtomicInteger(0);
        private final ElementOrder elementOrder;
        final Object objectToWait = new Object();
        Input input;
        private WatermarkStatus watermarkStatus = new WatermarkStatus(-1);
        private int watermarkIndex = -1;
        InternalTimerService<VoidNamespace> timerService;

        public SingleInputTestOperator(StreamOperatorParameters<String> parameters, ElementOrder elementOrder) {
            super(parameters, 1);
            this.elementOrder = elementOrder;
            this.input = new AbstractInput<Tuple2<Integer, String>, String>((AbstractStreamOperatorV2)this, 1){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws Exception {
                    processed.incrementAndGet();
                    Object object = objectToWait;
                    synchronized (object) {
                        objectToWait.wait();
                    }
                    timerService.registerEventTimeTimer((Object)VoidNamespace.INSTANCE, (long)((Integer)((Tuple2)element.getValue()).f0).intValue() + 100L);
                }
            };
        }

        public void open() throws Exception {
            super.open();
            this.timerService = this.getInternalTimerService("processing timer", (TypeSerializer)VoidNamespaceSerializer.INSTANCE, this);
        }

        public List<Input> getInputs() {
            return Collections.singletonList(this.input);
        }

        public ElementOrder getElementOrder() {
            return this.elementOrder;
        }

        public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
            Assertions.assertThat((Object)this.getCurrentKey()).isEqualTo(timer.getKey());
            this.output.collect((Object)new StreamRecord((Object)("EventTimer-" + timer.getKey() + "-" + timer.getTimestamp())));
        }

        public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
            Assertions.assertThat((Object)this.getCurrentKey()).isEqualTo(timer.getKey());
            this.output.collect((Object)new StreamRecord((Object)("ProcessingTimer-" + timer.getKey() + "-" + timer.getTimestamp())));
        }

        public void processRecordAttributes(RecordAttributes recordAttributes, int inputId) throws Exception {
            super.processRecordAttributes(recordAttributes, inputId);
            this.attributeProcessed.incrementAndGet();
        }

        public void processWatermarkStatus(WatermarkStatus watermarkStatus, int index) throws Exception {
            super.processWatermarkStatus(watermarkStatus, index);
            this.watermarkStatus = watermarkStatus;
            this.watermarkIndex = index;
        }

        public int getProcessed() {
            return this.processed.get();
        }

        public int getAttributeProcessed() {
            return this.attributeProcessed.get();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void proceed() {
            Object object = this.objectToWait;
            synchronized (object) {
                this.objectToWait.notify();
            }
        }
    }

    private static class TestOperatorFactory
    extends AbstractStreamOperatorFactory<String> {
        private final ElementOrder elementOrder;

        TestOperatorFactory(ElementOrder elementOrder) {
            this.elementOrder = elementOrder;
        }

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> parameters) {
            return (T)((Object)new SingleInputTestOperator(parameters, this.elementOrder));
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return SingleInputTestOperator.class;
        }
    }

    private static class KeyedOneInputStreamOperatorV2TestHarness<K, IN, OUT>
    extends AsyncKeyedOneInputStreamOperatorTestHarness<K, IN, OUT> {
        public static <K, IN, OUT> KeyedOneInputStreamOperatorV2TestHarness<K, IN, OUT> create(StreamOperatorFactory<OUT> operatorFactory, KeySelector<IN, K> keySelector, TypeInformation<K> keyType, int maxParallelism, int numSubtasks, int subtaskIndex) throws Exception {
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            CompletableFuture future = new CompletableFuture();
            executorService.execute(() -> {
                try {
                    future.complete(new KeyedOneInputStreamOperatorV2TestHarness(executorService, operatorFactory, keySelector, keyType, maxParallelism, numSubtasks, subtaskIndex));
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
            return (KeyedOneInputStreamOperatorV2TestHarness)((Object)future.get());
        }

        public KeyedOneInputStreamOperatorV2TestHarness(ExecutorService executorService, StreamOperatorFactory<OUT> operatorFactory, KeySelector<IN, K> keySelector, TypeInformation<K> keyType, int maxParallelism, int numSubtasks, int subtaskIndex) throws Exception {
            super(executorService, operatorFactory, keySelector, keyType, maxParallelism, numSubtasks, subtaskIndex);
        }

        public StreamOperator<OUT> getBaseOperator() {
            return this.operator;
        }
    }
}

