/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.async;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies;
import org.apache.flink.streaming.util.retryable.RetryPredicates;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

public class AsyncWaitOperatorTest
extends TestLogger {
    private static final long TIMEOUT = 1000L;
    @Rule
    public Timeout timeoutRule = new Timeout(100L, TimeUnit.SECONDS);
    @Rule
    public final SharedObjects sharedObjects = SharedObjects.create();
    private static AsyncRetryStrategy emptyResultFixedDelayRetryStrategy = new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(2, 10L).ifResult((Predicate)RetryPredicates.EMPTY_RESULT_PREDICATE).build();
    private static AsyncRetryStrategy exceptionRetryStrategy = new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(2, 10L).ifException((Predicate)RetryPredicates.HAS_EXCEPTION_PREDICATE).build();

    @Test
    public void testEventTimeOrdered() throws Exception {
        this.testEventTime(AsyncDataStream.OutputMode.ORDERED);
    }

    @Test
    public void testWaterMarkUnordered() throws Exception {
        this.testEventTime(AsyncDataStream.OutputMode.UNORDERED);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testEventTime(AsyncDataStream.OutputMode mode) throws Exception {
        OneInputStreamOperatorTestHarness testHarness = AsyncWaitOperatorTest.createTestHarness(new MyAsyncFunction(), 1000L, 2, mode);
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        Object object = testHarness.getCheckpointLock();
        synchronized (object) {
            testHarness.processElement((StreamRecord<Integer>)new StreamRecord((Object)1, 1L));
            testHarness.processElement((StreamRecord<Integer>)new StreamRecord((Object)2, 2L));
            testHarness.processWatermark(new Watermark(2L));
            testHarness.processElement((StreamRecord<Integer>)new StreamRecord((Object)3, 3L));
        }
        object = testHarness.getCheckpointLock();
        synchronized (object) {
            testHarness.endInput();
            testHarness.close();
        }
        expectedOutput.add(new StreamRecord((Object)2, 1L));
        expectedOutput.add(new StreamRecord((Object)4, 2L));
        expectedOutput.add(new Watermark(2L));
        expectedOutput.add(new StreamRecord((Object)6, 3L));
        if (AsyncDataStream.OutputMode.ORDERED == mode) {
            TestHarnessUtil.assertOutputEquals("Output with watermark was not correct.", expectedOutput, testHarness.getOutput());
        } else {
            Object[] jobOutputQueue = testHarness.getOutput().toArray();
            Assert.assertEquals((String)"Watermark should be at index 2", (Object)new Watermark(2L), (Object)jobOutputQueue[2]);
            Assert.assertEquals((String)"StreamRecord 3 should be at the end", (Object)new StreamRecord((Object)6, 3L), (Object)jobOutputQueue[3]);
            TestHarnessUtil.assertOutputEqualsSorted("Output for StreamRecords does not match", expectedOutput, testHarness.getOutput(), new StreamRecordComparator());
        }
    }

    @Test
    public void testProcessingTimeOrdered() throws Exception {
        this.testProcessingTime(AsyncDataStream.OutputMode.ORDERED);
    }

    @Test
    public void testProcessingUnordered() throws Exception {
        this.testProcessingTime(AsyncDataStream.OutputMode.UNORDERED);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testProcessingTime(AsyncDataStream.OutputMode mode) throws Exception {
        OneInputStreamOperatorTestHarness testHarness = AsyncWaitOperatorTest.createTestHarness(new MyAsyncFunction(), 1000L, 6, mode);
        long initialTime = 0L;
        ArrayDeque<Object> expectedOutput = new ArrayDeque<Object>();
        testHarness.open();
        Object object = testHarness.getCheckpointLock();
        synchronized (object) {
            testHarness.processElement((StreamRecord<Integer>)new StreamRecord((Object)1, 1L));
            testHarness.processElement((StreamRecord<Integer>)new StreamRecord((Object)2, 2L));
            testHarness.processElement((StreamRecord<Integer>)new StreamRecord((Object)3, 3L));
            testHarness.processElement((StreamRecord<Integer>)new StreamRecord((Object)4, 4L));
            testHarness.processElement((StreamRecord<Integer>)new StreamRecord((Object)5, 5L));
            testHarness.processElement((StreamRecord<Integer>)new StreamRecord((Object)6, 6L));
            testHarness.processElement((StreamRecord<Integer>)new StreamRecord((Object)7, 7L));
            testHarness.processElement((StreamRecord<Integer>)new StreamRecord((Object)8, 8L));
        }
        expectedOutput.add(new StreamRecord((Object)2, 1L));
        expectedOutput.add(new StreamRecord((Object)4, 2L));
        expectedOutput.add(new StreamRecord((Object)6, 3L));
        expectedOutput.add(new StreamRecord((Object)8, 4L));
        expectedOutput.add(new StreamRecord((Object)10, 5L));
        expectedOutput.add(new StreamRecord((Object)12, 6L));
        expectedOutput.add(new StreamRecord((Object)14, 7L));
        expectedOutput.add(new StreamRecord((Object)16, 8L));
        object = testHarness.getCheckpointLock();
        synchronized (object) {
            testHarness.endInput();
            testHarness.close();
        }
        if (mode == AsyncDataStream.OutputMode.ORDERED) {
            TestHarnessUtil.assertOutputEquals("ORDERED Output was not correct.", expectedOutput, testHarness.getOutput());
        } else {
            TestHarnessUtil.assertOutputEqualsSorted("UNORDERED Output was not correct.", expectedOutput, testHarness.getOutput(), new StreamRecordComparator());
        }
    }

    @Test
    public void testOperatorChainWithProcessingTime() throws Exception {
        JobVertex chainedVertex = this.createChainedVertex((AsyncFunction<Integer, Integer>)new MyAsyncFunction(), (AsyncFunction<Integer, Integer>)new MyAsyncFunction());
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, 1, 1, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        testHarness.taskConfig = chainedVertex.getConfiguration();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        StreamConfig operatorChainStreamConfig = new StreamConfig(chainedVertex.getConfiguration());
        streamConfig.setStreamOperatorFactory(operatorChainStreamConfig.getStreamOperatorFactory(AsyncWaitOperatorTest.class.getClassLoader()));
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        long initialTimestamp = 0L;
        testHarness.processElement(new StreamRecord((Object)5, initialTimestamp));
        testHarness.processElement(new StreamRecord((Object)6, initialTimestamp + 1L));
        testHarness.processElement(new StreamRecord((Object)7, initialTimestamp + 2L));
        testHarness.processElement(new StreamRecord((Object)8, initialTimestamp + 3L));
        testHarness.processElement(new StreamRecord((Object)9, initialTimestamp + 4L));
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        LinkedList<Object> expectedOutput = new LinkedList<Object>();
        expectedOutput.add(new StreamRecord((Object)22, initialTimestamp));
        expectedOutput.add(new StreamRecord((Object)26, initialTimestamp + 1L));
        expectedOutput.add(new StreamRecord((Object)30, initialTimestamp + 2L));
        expectedOutput.add(new StreamRecord((Object)34, initialTimestamp + 3L));
        expectedOutput.add(new StreamRecord((Object)38, initialTimestamp + 4L));
        TestHarnessUtil.assertOutputEqualsSorted("Test for chained operator with AsyncWaitOperator failed", expectedOutput, testHarness.getOutput(), new StreamRecordComparator());
    }

    private JobVertex createChainedVertex(AsyncFunction<Integer, Integer> firstFunction, AsyncFunction<Integer, Integer> secondFunction) {
        StreamExecutionEnvironment chainEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        chainEnv.setParallelism(2);
        DataStreamSource input = chainEnv.fromElements((Object[])new Integer[]{1, 2, 3});
        input = AsyncDataStream.orderedWait((DataStream)input, firstFunction, (long)1000L, (TimeUnit)TimeUnit.MILLISECONDS, (int)6);
        input = input.map((MapFunction)new RichMapFunction<Integer, Integer>(){
            private static final long serialVersionUID = 1L;
            private Integer initialValue = null;

            public void open(Configuration parameters) throws Exception {
                this.initialValue = 1;
            }

            public Integer map(Integer value) throws Exception {
                return this.initialValue + value;
            }
        });
        input = AsyncDataStream.unorderedWait((DataStream)input, secondFunction, (long)1000L, (TimeUnit)TimeUnit.MILLISECONDS, (int)3);
        input.map((MapFunction)new MapFunction<Integer, Integer>(){
            private static final long serialVersionUID = 5162085254238405527L;

            public Integer map(Integer value) throws Exception {
                return value;
            }
        }).startNewChain().addSink((SinkFunction)new DiscardingSink());
        JobGraph jobGraph = chainEnv.getStreamGraph().getJobGraph();
        Assert.assertEquals((long)3L, (long)jobGraph.getVerticesSortedTopologicallyFromSources().size());
        return (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
    }

    @Test
    public void testStateSnapshotAndRestore() throws Exception {
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, 1, 1, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        AsyncWaitOperatorFactory factory = new AsyncWaitOperatorFactory((AsyncFunction)new LazyAsyncFunction(), 1000L, 4, AsyncDataStream.OutputMode.ORDERED);
        StreamConfig streamConfig = testHarness.getStreamConfig();
        OperatorID operatorID = new OperatorID(42L, 4711L);
        streamConfig.setStreamOperatorFactory((StreamOperatorFactory)factory);
        streamConfig.setOperatorID(operatorID);
        TestTaskStateManager taskStateManagerMock = testHarness.getTaskStateManager();
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        OneInputStreamTask task = testHarness.getTask();
        long initialTime = 0L;
        testHarness.processElement(new StreamRecord((Object)1, 1L));
        testHarness.processElement(new StreamRecord((Object)2, 2L));
        testHarness.processElement(new StreamRecord((Object)3, 3L));
        testHarness.processElement(new StreamRecord((Object)4, 4L));
        testHarness.waitForInputProcessing();
        long checkpointId = 1L;
        long checkpointTimestamp = 1L;
        CheckpointMetaData checkpointMetaData = new CheckpointMetaData(1L, 1L);
        task.triggerCheckpointAsync(checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation());
        taskStateManagerMock.getWaitForReportLatch().await();
        Assert.assertEquals((long)1L, (long)taskStateManagerMock.getReportedCheckpointId());
        LazyAsyncFunction.countDown();
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        TaskStateSnapshot subtaskStates = taskStateManagerMock.getLastJobManagerTaskStateSnapshot();
        OneInputStreamTaskTestHarness restoredTaskHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
        restoredTaskHarness.setTaskStateSnapshot(1L, subtaskStates);
        restoredTaskHarness.setupOutputForSingletonOperatorChain();
        AsyncWaitOperatorFactory restoredOperator = new AsyncWaitOperatorFactory((AsyncFunction)new MyAsyncFunction(), 1000L, 6, AsyncDataStream.OutputMode.ORDERED);
        restoredTaskHarness.getStreamConfig().setStreamOperatorFactory((StreamOperatorFactory)restoredOperator);
        restoredTaskHarness.getStreamConfig().setOperatorID(operatorID);
        restoredTaskHarness.invoke();
        restoredTaskHarness.waitForTaskRunning();
        OneInputStreamTask restoredTask = restoredTaskHarness.getTask();
        restoredTaskHarness.processElement(new StreamRecord((Object)5, 5L));
        restoredTaskHarness.processElement(new StreamRecord((Object)6, 6L));
        restoredTaskHarness.processElement(new StreamRecord((Object)7, 7L));
        restoredTask.triggerCheckpointAsync(new CheckpointMetaData(1L, 1L), CheckpointOptions.forCheckpointWithDefaultLocation()).get();
        restoredTaskHarness.processElement(new StreamRecord((Object)8, 8L));
        restoredTaskHarness.endInput();
        restoredTaskHarness.waitForTaskCompletion();
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        expectedOutput.add(new StreamRecord((Object)2, 1L));
        expectedOutput.add(new StreamRecord((Object)4, 2L));
        expectedOutput.add(new StreamRecord((Object)6, 3L));
        expectedOutput.add(new StreamRecord((Object)8, 4L));
        expectedOutput.add(new StreamRecord((Object)10, 5L));
        expectedOutput.add(new StreamRecord((Object)12, 6L));
        expectedOutput.add(new StreamRecord((Object)14, 7L));
        expectedOutput.add(new StreamRecord((Object)16, 8L));
        restoredTaskHarness.getOutput().removeIf(record -> record instanceof CheckpointBarrier);
        TestHarnessUtil.assertOutputEquals("StateAndRestored Test Output was not correct.", expectedOutput, restoredTaskHarness.getOutput());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testObjectReused() throws Exception {
        TypeSerializer[] fieldSerializers = new TypeSerializer[]{IntSerializer.INSTANCE};
        TupleSerializer inputSerializer = new TupleSerializer(Tuple1.class, fieldSerializers);
        AsyncWaitOperatorFactory factory = new AsyncWaitOperatorFactory((AsyncFunction)new InputReusedAsyncFunction(), 1000L, 4, AsyncDataStream.OutputMode.ORDERED);
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(factory, inputSerializer);
        testHarness.getExecutionConfig().enableObjectReuse();
        long initialTime = 0L;
        Tuple1 reusedTuple = new Tuple1();
        StreamRecord reusedRecord = new StreamRecord((Object)reusedTuple, -1L);
        testHarness.setup();
        testHarness.open();
        Object object = testHarness.getCheckpointLock();
        synchronized (object) {
            reusedTuple.setFields((Object)1);
            reusedRecord.setTimestamp(1L);
            testHarness.processElement(reusedRecord);
            reusedTuple.setFields((Object)2);
            reusedRecord.setTimestamp(2L);
            testHarness.processElement(reusedRecord);
            reusedTuple.setFields((Object)3);
            reusedRecord.setTimestamp(3L);
            testHarness.processElement(reusedRecord);
            reusedTuple.setFields((Object)4);
            reusedRecord.setTimestamp(4L);
            testHarness.processElement(reusedRecord);
        }
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        expectedOutput.add(new StreamRecord((Object)2, 1L));
        expectedOutput.add(new StreamRecord((Object)4, 2L));
        expectedOutput.add(new StreamRecord((Object)6, 3L));
        expectedOutput.add(new StreamRecord((Object)8, 4L));
        Object object2 = testHarness.getCheckpointLock();
        synchronized (object2) {
            testHarness.endInput();
            testHarness.close();
        }
        TestHarnessUtil.assertOutputEquals("StateAndRestoredWithObjectReuse Test Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testAsyncTimeoutFailure() throws Exception {
        this.testAsyncTimeout(new LazyAsyncFunction(), Optional.of(TimeoutException.class), new StreamRecord((Object)2, 5L));
    }

    @Test
    public void testAsyncTimeoutIgnore() throws Exception {
        this.testAsyncTimeout(new IgnoreTimeoutLazyAsyncFunction(), Optional.empty(), new StreamRecord((Object)3, 0L), new StreamRecord((Object)2, 5L));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testAsyncTimeout(LazyAsyncFunction lazyAsyncFunction, Optional<Class<? extends Throwable>> expectedException, StreamRecord<Integer> ... expectedRecords) throws Exception {
        long timeout = 10L;
        OneInputStreamOperatorTestHarness testHarness = AsyncWaitOperatorTest.createTestHarness(lazyAsyncFunction, 10L, 2, AsyncDataStream.OutputMode.ORDERED);
        MockEnvironment mockEnvironment = testHarness.getEnvironment();
        mockEnvironment.setExpectedExternalFailureCause(Throwable.class);
        long initialTime = 0L;
        ConcurrentLinkedQueue<StreamRecord<Integer>> expectedOutput = new ConcurrentLinkedQueue<StreamRecord<Integer>>();
        testHarness.open();
        testHarness.setProcessingTime(0L);
        Object object = testHarness.getCheckpointLock();
        synchronized (object) {
            testHarness.processElement((StreamRecord<Integer>)new StreamRecord((Object)1, 0L));
            testHarness.setProcessingTime(5L);
            testHarness.processElement((StreamRecord<Integer>)new StreamRecord((Object)2, 5L));
        }
        testHarness.setProcessingTime(11L);
        lazyAsyncFunction.countDown();
        object = testHarness.getCheckpointLock();
        synchronized (object) {
            testHarness.endInput();
            testHarness.close();
        }
        expectedOutput.addAll(Arrays.asList(expectedRecords));
        TestHarnessUtil.assertOutputEquals("Output with watermark was not correct.", expectedOutput, testHarness.getOutput());
        if (expectedException.isPresent()) {
            Assert.assertTrue((boolean)mockEnvironment.getActualExternalFailureCause().isPresent());
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)((Throwable)mockEnvironment.getActualExternalFailureCause().get()), expectedException.get()).isPresent());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTimeoutCleanup() throws Exception {
        OneInputStreamOperatorTestHarness harness = AsyncWaitOperatorTest.createTestHarness(new MyAsyncFunction(), 1000L, 1, AsyncDataStream.OutputMode.UNORDERED);
        harness.open();
        Object object = harness.getCheckpointLock();
        synchronized (object) {
            harness.processElement(42, 1L);
        }
        object = harness.getCheckpointLock();
        synchronized (object) {
            harness.endInput();
            harness.close();
        }
        Assert.assertEquals(Arrays.asList(new StreamRecord((Object)84, 1L)), new ArrayList<Object>(harness.getOutput()));
        Assert.assertEquals((long)0L, (long)harness.getProcessingTimeService().getNumActiveTimers());
    }

    @Test
    public void testTimeoutAfterComplete() throws Exception {
        StreamTaskMailboxTestHarnessBuilder builder = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO);
        try (StreamTaskMailboxTestHarness harness = builder.setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)new AsyncWaitOperatorFactory((AsyncFunction)new TimeoutAfterCompletionTestFunction(), 1000L, 1, AsyncDataStream.OutputMode.UNORDERED)).build();){
            harness.processElement(new StreamRecord((Object)1));
            ScheduledFuture testTimer = harness.getTimerService().registerTimer(harness.getTimerService().getCurrentProcessingTime() + 1000L, ts -> {});
            TimeoutAfterCompletionTestFunction.COMPLETION_TRIGGER.countDown();
            testTimer.get();
            harness.processAll();
            Assert.assertEquals(Collections.singleton(new StreamRecord((Object)1)), new HashSet<Object>(harness.getOutput()));
            Assert.assertFalse((String)"no timeout expected", (boolean)TimeoutAfterCompletionTestFunction.TIMED_OUT.get());
        }
    }

    @Test
    public void testOrderedWaitUserExceptionHandling() throws Exception {
        this.testUserExceptionHandling(AsyncDataStream.OutputMode.ORDERED, (AsyncRetryStrategy)AsyncRetryStrategies.NO_RETRY_STRATEGY);
    }

    @Test
    public void testOrderedWaitUserExceptionHandlingWithRetry() throws Exception {
        this.testUserExceptionHandling(AsyncDataStream.OutputMode.ORDERED, exceptionRetryStrategy);
    }

    @Test
    public void testUnorderedWaitUserExceptionHandling() throws Exception {
        this.testUserExceptionHandling(AsyncDataStream.OutputMode.UNORDERED, (AsyncRetryStrategy)AsyncRetryStrategies.NO_RETRY_STRATEGY);
    }

    @Test
    public void testUnorderedWaitUserExceptionHandlingWithRetry() throws Exception {
        this.testUserExceptionHandling(AsyncDataStream.OutputMode.UNORDERED, exceptionRetryStrategy);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testUserExceptionHandling(AsyncDataStream.OutputMode outputMode, AsyncRetryStrategy asyncRetryStrategy) throws Exception {
        OneInputStreamOperatorTestHarness<Integer, Integer> harness = AsyncWaitOperatorTest.createTestHarnessWithRetry(new UserExceptionAsyncFunction(), 1000L, 2, outputMode, asyncRetryStrategy);
        harness.getEnvironment().setExpectedExternalFailureCause(Throwable.class);
        harness.open();
        Object object = harness.getCheckpointLock();
        synchronized (object) {
            harness.processElement(1, 1L);
        }
        object = harness.getCheckpointLock();
        synchronized (object) {
            harness.endInput();
            harness.close();
        }
        Assert.assertTrue((boolean)harness.getEnvironment().getActualExternalFailureCause().isPresent());
    }

    @Test
    public void testOrderedWaitTimeoutHandling() throws Exception {
        this.testTimeoutExceptionHandling(AsyncDataStream.OutputMode.ORDERED, (AsyncRetryStrategy)AsyncRetryStrategies.NO_RETRY_STRATEGY);
    }

    @Test
    public void testOrderedWaitTimeoutHandlingWithRetry() throws Exception {
        this.testTimeoutExceptionHandling(AsyncDataStream.OutputMode.ORDERED, emptyResultFixedDelayRetryStrategy);
    }

    @Test
    public void testUnorderedWaitTimeoutHandling() throws Exception {
        this.testTimeoutExceptionHandling(AsyncDataStream.OutputMode.UNORDERED, (AsyncRetryStrategy)AsyncRetryStrategies.NO_RETRY_STRATEGY);
    }

    @Test
    public void testUnorderedWaitTimeoutHandlingWithRetry() throws Exception {
        this.testTimeoutExceptionHandling(AsyncDataStream.OutputMode.UNORDERED, emptyResultFixedDelayRetryStrategy);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testTimeoutExceptionHandling(AsyncDataStream.OutputMode outputMode, AsyncRetryStrategy asyncRetryStrategy) throws Exception {
        OneInputStreamOperatorTestHarness harness = AsyncWaitOperatorTest.createTestHarnessWithRetry(new NoOpAsyncFunction(), 10L, 2, outputMode, asyncRetryStrategy);
        harness.getEnvironment().setExpectedExternalFailureCause(Throwable.class);
        harness.open();
        Object object = harness.getCheckpointLock();
        synchronized (object) {
            harness.processElement(1, 1L);
        }
        harness.setProcessingTime(10L);
        object = harness.getCheckpointLock();
        synchronized (object) {
            harness.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=10000L)
    public void testRestartWithFullQueue() throws Exception {
        OperatorSubtaskState snapshot;
        Object object;
        int capacity = 10;
        CompletableFuture<Object> trigger = new CompletableFuture<Object>();
        OneInputStreamOperatorTestHarness snapshotHarness = AsyncWaitOperatorTest.createTestHarness(new ControllableAsyncFunction(trigger), 1000L, 10, AsyncDataStream.OutputMode.ORDERED);
        snapshotHarness.open();
        ArrayList<Integer> expectedOutput = new ArrayList<Integer>(10);
        try {
            object = snapshotHarness.getCheckpointLock();
            synchronized (object) {
                for (int i = 0; i < 10; ++i) {
                    snapshotHarness.processElement(i, 0L);
                    expectedOutput.add(i);
                }
            }
            object = snapshotHarness.getCheckpointLock();
            synchronized (object) {
                snapshot = snapshotHarness.snapshot(0L, 0L);
            }
            trigger.complete(null);
        }
        finally {
            object = snapshotHarness.getCheckpointLock();
            synchronized (object) {
                snapshotHarness.close();
            }
        }
        OneInputStreamOperatorTestHarness recoverHarness = AsyncWaitOperatorTest.createTestHarness(new ControllableAsyncFunction(CompletableFuture.completedFuture(null)), 1000L, 10, AsyncDataStream.OutputMode.ORDERED);
        recoverHarness.initializeState(snapshot);
        Object i = recoverHarness.getCheckpointLock();
        synchronized (i) {
            recoverHarness.open();
        }
        i = recoverHarness.getCheckpointLock();
        synchronized (i) {
            recoverHarness.endInput();
            recoverHarness.close();
        }
        ConcurrentLinkedQueue<Object> output = recoverHarness.getOutput();
        List outputElements = output.stream().map(r -> (Integer)((StreamRecord)r).getValue()).collect(Collectors.toList());
        Assert.assertThat(outputElements, (Matcher)Matchers.equalTo(expectedOutput));
    }

    @Test
    public void testIgnoreAsyncOperatorRecordsOnDrain() throws Exception {
        this.testIgnoreAsyncOperatorRecordsOnDrain((AsyncRetryStrategy)AsyncRetryStrategies.NO_RETRY_STRATEGY);
    }

    @Test
    public void testIgnoreAsyncOperatorRecordsOnDrainWithRetry() throws Exception {
        this.testIgnoreAsyncOperatorRecordsOnDrain(emptyResultFixedDelayRetryStrategy);
    }

    private void testIgnoreAsyncOperatorRecordsOnDrain(AsyncRetryStrategy asyncRetryStrategy) throws Exception {
        StreamTaskMailboxTestHarnessBuilder builder = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO);
        SharedReference resultFutures = this.sharedObjects.add(new ArrayList());
        try (StreamTaskMailboxTestHarness harness = builder.setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)new AsyncWaitOperatorFactory(new CollectableFuturesAsyncFunction(resultFutures), 1000L, 5, AsyncDataStream.OutputMode.ORDERED, asyncRetryStrategy)).build();){
            harness.processElement(new StreamRecord((Object)1));
            harness.processElement(new StreamRecord((Object)2));
            for (ResultFuture resultFuture : Lists.reverse((List)((List)resultFutures.get()))) {
                resultFuture.complete(Collections.emptyList());
            }
            harness.finishProcessing();
            Assert.assertTrue((boolean)harness.getOutput().isEmpty());
        }
    }

    @Test
    public void testProcessingTimeOrderedWithRetry() throws Exception {
        this.testProcessingTimeWithRetry(AsyncDataStream.OutputMode.ORDERED, new OddInputEmptyResultAsyncFunction());
    }

    @Test
    public void testProcessingTimeUnorderedWithRetry() throws Exception {
        this.testProcessingTimeWithRetry(AsyncDataStream.OutputMode.UNORDERED, new OddInputEmptyResultAsyncFunction());
    }

    @Test
    public void testProcessingTimeRepeatedCompleteUnorderedWithRetry() throws Exception {
        this.testProcessingTimeWithRetry(AsyncDataStream.OutputMode.UNORDERED, new IllWrittenOddInputEmptyResultAsyncFunction());
    }

    @Test
    public void testProcessingTimeRepeatedCompleteOrderedWithRetry() throws Exception {
        this.testProcessingTimeWithRetry(AsyncDataStream.OutputMode.ORDERED, new IllWrittenOddInputEmptyResultAsyncFunction());
    }

    private void testProcessingTimeWithRetry(AsyncDataStream.OutputMode mode, RichAsyncFunction asyncFunction) throws Exception {
        StreamTaskMailboxTestHarnessBuilder builder = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO);
        try (StreamTaskMailboxTestHarness testHarness = builder.setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)new AsyncWaitOperatorFactory((AsyncFunction)asyncFunction, 1000L, 6, mode, emptyResultFixedDelayRetryStrategy)).build();){
            long initialTime = 0L;
            ArrayDeque<Object> expectedOutput = new ArrayDeque<Object>();
            testHarness.processElement(new StreamRecord((Object)1, 1L));
            testHarness.processElement(new StreamRecord((Object)2, 2L));
            testHarness.processElement(new StreamRecord((Object)3, 3L));
            testHarness.processElement(new StreamRecord((Object)4, 4L));
            testHarness.processElement(new StreamRecord((Object)5, 5L));
            testHarness.processElement(new StreamRecord((Object)6, 6L));
            expectedOutput.add(new StreamRecord((Object)4, 2L));
            expectedOutput.add(new StreamRecord((Object)8, 4L));
            expectedOutput.add(new StreamRecord((Object)12, 6L));
            while (testHarness.getOutput().size() < expectedOutput.size()) {
                testHarness.processAll();
                Thread.sleep(100L);
            }
            if (mode == AsyncDataStream.OutputMode.ORDERED) {
                TestHarnessUtil.assertOutputEquals("ORDERED Output was not correct.", expectedOutput, testHarness.getOutput());
            } else {
                TestHarnessUtil.assertOutputEqualsSorted("UNORDERED Output was not correct.", expectedOutput, testHarness.getOutput(), new StreamRecordComparator());
            }
        }
    }

    @Test
    public void testProcessingTimeWithTimeoutFunctionUnorderedWithRetry() throws Exception {
        this.testProcessingTimeAlwaysTimeoutFunctionWithRetry(AsyncDataStream.OutputMode.UNORDERED);
    }

    @Test
    public void testProcessingTimeWithTimeoutFunctionOrderedWithRetry() throws Exception {
        this.testProcessingTimeAlwaysTimeoutFunctionWithRetry(AsyncDataStream.OutputMode.ORDERED);
    }

    private void testProcessingTimeAlwaysTimeoutFunctionWithRetry(AsyncDataStream.OutputMode mode) throws Exception {
        StreamTaskMailboxTestHarnessBuilder builder = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO);
        AsyncRetryStrategies.FixedDelayRetryStrategy exceptionRetryStrategy = new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(5, 100L).ifException((Predicate)RetryPredicates.HAS_EXCEPTION_PREDICATE).build();
        AlwaysTimeoutWithDefaultValueAsyncFunction asyncFunction = new AlwaysTimeoutWithDefaultValueAsyncFunction();
        try (StreamTaskMailboxTestHarness testHarness = builder.setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)new AsyncWaitOperatorFactory((AsyncFunction)asyncFunction, 1000L, 10, mode, (AsyncRetryStrategy)exceptionRetryStrategy)).build();){
            long initialTime = 0L;
            ArrayDeque<Object> expectedOutput = new ArrayDeque<Object>();
            testHarness.processElement(new StreamRecord((Object)1, 1L));
            testHarness.processElement(new StreamRecord((Object)2, 2L));
            expectedOutput.add(new StreamRecord((Object)-1, 1L));
            expectedOutput.add(new StreamRecord((Object)-1, 2L));
            while (testHarness.getOutput().size() < expectedOutput.size()) {
                testHarness.processAll();
                Thread.sleep(100L);
            }
            if (mode == AsyncDataStream.OutputMode.ORDERED) {
                TestHarnessUtil.assertOutputEquals("ORDERED Output was not correct.", expectedOutput, testHarness.getOutput());
            } else {
                TestHarnessUtil.assertOutputEqualsSorted("UNORDERED Output was not correct.", expectedOutput, testHarness.getOutput(), new StreamRecordComparator());
            }
            Assert.assertTrue((asyncFunction.getTryCount(1) <= 2 ? 1 : 0) != 0);
            Assert.assertTrue((asyncFunction.getTryCount(2) <= 2 ? 1 : 0) != 0);
        }
    }

    private static <OUT> OneInputStreamOperatorTestHarness<Integer, OUT> createTestHarness(AsyncFunction<Integer, OUT> function, long timeout, int capacity, AsyncDataStream.OutputMode outputMode) throws Exception {
        return new OneInputStreamOperatorTestHarness(new AsyncWaitOperatorFactory(function, timeout, capacity, outputMode), IntSerializer.INSTANCE);
    }

    private static <OUT> OneInputStreamOperatorTestHarness<Integer, OUT> createTestHarnessWithRetry(AsyncFunction<Integer, OUT> function, long timeout, int capacity, AsyncDataStream.OutputMode outputMode, AsyncRetryStrategy<OUT> asyncRetryStrategy) throws Exception {
        return new OneInputStreamOperatorTestHarness(new AsyncWaitOperatorFactory(function, timeout, capacity, outputMode, asyncRetryStrategy), IntSerializer.INSTANCE);
    }

    private static class AlwaysTimeoutWithDefaultValueAsyncFunction
    extends RichAsyncFunction<Integer, Integer> {
        private static final long serialVersionUID = 1L;
        private static Map<Integer, Integer> tryCounts = new HashMap<Integer, Integer>();

        private AlwaysTimeoutWithDefaultValueAsyncFunction() {
        }

        @VisibleForTesting
        public int getTryCount(Integer item) {
            return tryCounts.getOrDefault(item, 0);
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            tryCounts = new HashMap<Integer, Integer>();
        }

        public void asyncInvoke(Integer input, ResultFuture<Integer> resultFuture) {
            tryCounts.merge(input, 1, Integer::sum);
            CompletableFuture.runAsync(() -> {
                try {
                    Thread.sleep(501L);
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                resultFuture.completeExceptionally((Throwable)new Exception("Dummy error"));
            });
        }

        public void timeout(Integer input, ResultFuture<Integer> resultFuture) {
            resultFuture.complete(Collections.singletonList(-1));
        }
    }

    private static class NoOpAsyncFunction<IN, OUT>
    implements AsyncFunction<IN, OUT> {
        private static final long serialVersionUID = -3060481953330480694L;

        private NoOpAsyncFunction() {
        }

        public void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception {
        }
    }

    private static class ControllableAsyncFunction<IN>
    implements AsyncFunction<IN, IN> {
        private static final long serialVersionUID = -4214078239267288636L;
        private transient CompletableFuture<Void> trigger;

        private ControllableAsyncFunction(CompletableFuture<Void> trigger) {
            this.trigger = (CompletableFuture)Preconditions.checkNotNull(trigger);
        }

        public void asyncInvoke(IN input, ResultFuture<IN> resultFuture) throws Exception {
            this.trigger.thenAccept(v -> resultFuture.complete(Collections.singleton(input)));
        }
    }

    private static class CollectableFuturesAsyncFunction<IN>
    implements AsyncFunction<IN, IN> {
        private static final long serialVersionUID = -4214078239227288637L;
        private final SharedReference<List<ResultFuture<?>>> resultFutures;

        private CollectableFuturesAsyncFunction(SharedReference<List<ResultFuture<?>>> resultFutures) {
            this.resultFutures = resultFutures;
        }

        public void asyncInvoke(IN input, ResultFuture<IN> resultFuture) throws Exception {
            ((List)this.resultFutures.get()).add(resultFuture);
        }
    }

    private static class UserExceptionAsyncFunction
    implements AsyncFunction<Integer, Integer> {
        private static final long serialVersionUID = 6326568632967110990L;

        private UserExceptionAsyncFunction() {
        }

        public void asyncInvoke(Integer input, ResultFuture<Integer> resultFuture) throws Exception {
            resultFuture.completeExceptionally((Throwable)new Exception("Test exception"));
        }
    }

    private class StreamRecordComparator
    implements Comparator<Object> {
        private StreamRecordComparator() {
        }

        @Override
        public int compare(Object o1, Object o2) {
            if (o1 instanceof Watermark || o2 instanceof Watermark) {
                return 0;
            }
            StreamRecord sr0 = (StreamRecord)o1;
            StreamRecord sr1 = (StreamRecord)o2;
            if (sr0.getTimestamp() != sr1.getTimestamp()) {
                return (int)(sr0.getTimestamp() - sr1.getTimestamp());
            }
            int comparison = ((Integer)sr0.getValue()).compareTo((Integer)sr1.getValue());
            if (comparison != 0) {
                return comparison;
            }
            return (Integer)sr0.getValue() - (Integer)sr1.getValue();
        }
    }

    private static class IllWrittenOddInputEmptyResultAsyncFunction
    extends MyAbstractAsyncFunction<Integer> {
        private static final long serialVersionUID = 1L;

        private IllWrittenOddInputEmptyResultAsyncFunction() {
        }

        public void asyncInvoke(final Integer input, final ResultFuture<Integer> resultFuture) throws Exception {
            executorService.submit(new Runnable(){

                @Override
                public void run() {
                    try {
                        Thread.sleep(3L);
                    }
                    catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    if (input % 2 == 1) {
                        for (int i = 0; i < 10; ++i) {
                            resultFuture.complete((Collection)Collections.EMPTY_LIST);
                        }
                    } else {
                        resultFuture.complete(Collections.singletonList(input * 2));
                    }
                }
            });
        }
    }

    private static class OddInputEmptyResultAsyncFunction
    extends MyAbstractAsyncFunction<Integer> {
        private static final long serialVersionUID = 1L;

        private OddInputEmptyResultAsyncFunction() {
        }

        public void asyncInvoke(final Integer input, final ResultFuture<Integer> resultFuture) throws Exception {
            executorService.submit(new Runnable(){

                @Override
                public void run() {
                    try {
                        Thread.sleep(3L);
                    }
                    catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    if (input % 2 == 1) {
                        resultFuture.complete((Collection)Collections.EMPTY_LIST);
                    } else {
                        resultFuture.complete(Collections.singletonList(input * 2));
                    }
                }
            });
        }
    }

    private static class TimeoutAfterCompletionTestFunction
    implements AsyncFunction<Integer, Integer> {
        static final AtomicBoolean TIMED_OUT = new AtomicBoolean(false);
        static final CountDownLatch COMPLETION_TRIGGER = new CountDownLatch(1);

        private TimeoutAfterCompletionTestFunction() {
        }

        public void asyncInvoke(Integer input, ResultFuture<Integer> resultFuture) {
            ForkJoinPool.commonPool().submit(() -> {
                COMPLETION_TRIGGER.await();
                resultFuture.complete(Collections.singletonList(input));
                return null;
            });
        }

        public void timeout(Integer input, ResultFuture<Integer> resultFuture) {
            TIMED_OUT.set(true);
        }
    }

    private static class IgnoreTimeoutLazyAsyncFunction
    extends LazyAsyncFunction {
        private static final long serialVersionUID = 1428714561365346128L;

        private IgnoreTimeoutLazyAsyncFunction() {
        }

        public void timeout(Integer input, ResultFuture<Integer> resultFuture) throws Exception {
            resultFuture.complete(Collections.singletonList(input * 3));
        }
    }

    private static class InputReusedAsyncFunction
    extends MyAbstractAsyncFunction<Tuple1<Integer>> {
        private static final long serialVersionUID = 8627909616410487720L;

        private InputReusedAsyncFunction() {
        }

        public void asyncInvoke(final Tuple1<Integer> input, final ResultFuture<Integer> resultFuture) throws Exception {
            executorService.submit(new Runnable(){

                @Override
                public void run() {
                    resultFuture.complete(Collections.singletonList((Integer)input.f0 * 2));
                }
            });
        }
    }

    private static class LazyAsyncFunction
    extends MyAsyncFunction {
        private static final long serialVersionUID = 3537791752703154670L;
        private static CountDownLatch latch;

        public LazyAsyncFunction() {
            latch = new CountDownLatch(1);
        }

        @Override
        public void asyncInvoke(final Integer input, final ResultFuture<Integer> resultFuture) throws Exception {
            executorService.submit(new Runnable(){

                @Override
                public void run() {
                    try {
                        latch.await();
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                    resultFuture.complete(Collections.singletonList(input));
                }
            });
        }

        public static void countDown() {
            latch.countDown();
        }
    }

    private static class MyAsyncFunction
    extends MyAbstractAsyncFunction<Integer> {
        private static final long serialVersionUID = -1504699677704123889L;

        private MyAsyncFunction() {
        }

        public void asyncInvoke(final Integer input, final ResultFuture<Integer> resultFuture) throws Exception {
            executorService.submit(new Runnable(){

                @Override
                public void run() {
                    resultFuture.complete(Collections.singletonList(input * 2));
                }
            });
        }
    }

    private static abstract class MyAbstractAsyncFunction<IN>
    extends RichAsyncFunction<IN, Integer> {
        private static final long serialVersionUID = 8522411971886428444L;
        private static final long TERMINATION_TIMEOUT = 5000L;
        private static final int THREAD_POOL_SIZE = 10;
        static ExecutorService executorService;
        static int counter;

        private MyAbstractAsyncFunction() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            Class<MyAbstractAsyncFunction> clazz = MyAbstractAsyncFunction.class;
            synchronized (MyAbstractAsyncFunction.class) {
                if (counter == 0) {
                    executorService = Executors.newFixedThreadPool(10);
                }
                ++counter;
                // ** MonitorExit[var2_2] (shouldn't be in output)
                return;
            }
        }

        public void close() throws Exception {
            super.close();
            this.freeExecutor();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void freeExecutor() {
            Class<MyAbstractAsyncFunction> clazz = MyAbstractAsyncFunction.class;
            synchronized (MyAbstractAsyncFunction.class) {
                if (--counter == 0) {
                    executorService.shutdown();
                    try {
                        if (!executorService.awaitTermination(5000L, TimeUnit.MILLISECONDS)) {
                            executorService.shutdownNow();
                        }
                    }
                    catch (InterruptedException interrupted) {
                        executorService.shutdownNow();
                        Thread.currentThread().interrupt();
                    }
                }
                // ** MonitorExit[var1_1] (shouldn't be in output)
                return;
            }
        }

        static {
            counter = 0;
        }
    }
}

