package org.apache.flink.streaming.runtime.tasks;

import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriterWithAvailabilityHelper;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
import org.apache.flink.runtime.metrics.util.InterceptingTaskMetricGroup;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsMapContaining;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.class */
public class OneInputStreamTaskTest extends TestLogger {
    private static final ListStateDescriptor<Integer> TEST_DESCRIPTOR = new ListStateDescriptor<>("test", new IntSerializer());

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest$DuplicatingOperator.class */
    static class DuplicatingOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> {
        public void processElement(StreamRecord<String> streamRecord) {
            this.output.collect(streamRecord);
            this.output.collect(streamRecord);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest$IdentityKeySelector.class */
    private static class IdentityKeySelector<IN> implements KeySelector<IN, IN> {
        private static final long serialVersionUID = -3555913664416688425L;

        private IdentityKeySelector() {
        }

        public IN getKey(IN in) throws Exception {
            return in;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest$IdentityMap.class */
    private static class IdentityMap implements MapFunction<String, String> {
        private static final long serialVersionUID = 1;

        private IdentityMap() {
        }

        public String map(String str) throws Exception {
            return str;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest$OddEvenOperator.class */
    static class OddEvenOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> {
        private final OutputTag<Integer> oddOutputTag = new OutputTag<>("odd", BasicTypeInfo.INT_TYPE_INFO);
        private final OutputTag<Integer> evenOutputTag = new OutputTag<>("even", BasicTypeInfo.INT_TYPE_INFO);

        public void processElement(StreamRecord<Integer> streamRecord) {
            if (((Integer) streamRecord.getValue()).intValue() % 2 == 0) {
                this.output.collect(this.evenOutputTag, streamRecord);
            } else {
                this.output.collect(this.oddOutputTag, streamRecord);
            }
            this.output.collect(streamRecord);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest$PassThroughOperator.class */
    static class PassThroughOperator<T> extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
        PassThroughOperator() {
        }

        public void processElement(StreamRecord<T> streamRecord) throws Exception {
            this.output.collect(streamRecord);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest$TestOpenCloseMapFunction.class */
    private static class TestOpenCloseMapFunction extends RichMapFunction<String, String> {
        private static final long serialVersionUID = 1;
        public static boolean openCalled = false;
        public static boolean closeCalled = false;

        TestOpenCloseMapFunction() {
            openCalled = false;
            closeCalled = false;
        }

        public void open(OpenContext openContext) throws Exception {
            super.open(openContext);
            if (closeCalled) {
                Assert.fail("Close called before open.");
            }
            openCalled = true;
        }

        public void close() throws Exception {
            super.close();
            if (!openCalled) {
                Assert.fail("Open was not called before close.");
            }
            closeCalled = true;
        }

        public String map(String str) throws Exception {
            if (!openCalled) {
                Assert.fail("Open was not called before run.");
            }
            return str;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest$TestOperator.class */
    private static class TestOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> {
        private static final long serialVersionUID = 1;

        private TestOperator() {
        }

        public void processElement(StreamRecord<String> streamRecord) throws Exception {
            this.output.collect(streamRecord);
        }

        public void finish() throws Exception {
            Assert.assertTrue(getContainingTask().getTimerService().isAlive());
            super.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest$TestingStreamOperator.class */
    public static class TestingStreamOperator<IN, OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT> {
        private static final long serialVersionUID = 774614855940397174L;
        public static int numberRestoreCalls = 0;
        public static int numberSnapshotCalls = 0;

        private TestingStreamOperator() {
        }

        public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
            ListState listState = getOperatorStateBackend().getListState(OneInputStreamTaskTest.TEST_DESCRIPTOR);
            listState.clear();
            listState.add(42);
            listState.add(4711);
            numberSnapshotCalls++;
        }

        public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
            if (stateInitializationContext.isRestored()) {
                numberRestoreCalls++;
            }
            ListState listState = stateInitializationContext.getOperatorStateStore().getListState(OneInputStreamTaskTest.TEST_DESCRIPTOR);
            if (numberSnapshotCalls == 0) {
                for (Integer num : (Iterable) listState.get()) {
                    Assert.fail();
                }
                return;
            }
            HashSet hashSet = new HashSet();
            Iterator it = ((Iterable) listState.get()).iterator();
            while (it.hasNext()) {
                hashSet.add((Integer) it.next());
            }
            Assert.assertEquals(2L, hashSet.size());
            Assert.assertTrue(hashSet.contains(42));
            Assert.assertTrue(hashSet.contains(4711));
        }

        public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest$TriggerableFailOnWatermarkTestOperator.class */
    private static class TriggerableFailOnWatermarkTestOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> {
        private static final long serialVersionUID = 2048954179291813243L;
        public static final String EXPECT_FORWARDED_WATERMARKS_MARKER = "EXPECT_WATERMARKS";
        public static final String NO_FORWARDED_WATERMARKS_MARKER = "NO_WATERMARKS";
        protected boolean expectForwardedWatermarks;

        private TriggerableFailOnWatermarkTestOperator() {
        }

        public void processElement(StreamRecord<String> streamRecord) throws Exception {
            this.output.collect(streamRecord);
            if (((String) streamRecord.getValue()).equals(EXPECT_FORWARDED_WATERMARKS_MARKER)) {
                this.expectForwardedWatermarks = true;
            } else if (((String) streamRecord.getValue()).equals(NO_FORWARDED_WATERMARKS_MARKER)) {
                this.expectForwardedWatermarks = false;
            } else {
                handleElement(streamRecord);
            }
        }

        public void processWatermark(Watermark watermark) throws Exception {
            if (!this.expectForwardedWatermarks) {
                throw new Exception("Received a " + watermark + ", but this operator should not be forwarded watermarks.");
            }
            handleWatermark(watermark);
        }

        protected void handleElement(StreamRecord<String> streamRecord) {
        }

        protected void handleWatermark(Watermark watermark) {
            this.output.emitWatermark(watermark);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest$WatermarkGeneratingTestOperator.class */
    private static class WatermarkGeneratingTestOperator extends TriggerableFailOnWatermarkTestOperator {
        private static final long serialVersionUID = -5064871833244157221L;
        private long lastWatermark;

        private WatermarkGeneratingTestOperator() {
            super();
        }

        @Override // org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTest.TriggerableFailOnWatermarkTestOperator
        protected void handleElement(StreamRecord<String> streamRecord) {
            long longValue = Long.valueOf((String) streamRecord.getValue()).longValue();
            if (longValue > this.lastWatermark) {
                this.output.emitWatermark(new Watermark(longValue));
                this.lastWatermark = longValue;
            }
        }

        @Override // org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTest.TriggerableFailOnWatermarkTestOperator
        protected void handleWatermark(Watermark watermark) {
            if (watermark.equals(Watermark.MAX_WATERMARK)) {
                this.output.emitWatermark(watermark);
                this.lastWatermark = Long.MAX_VALUE;
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest$WatermarkMetricOperator.class */
    static class WatermarkMetricOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> {
        public void processElement(StreamRecord<String> streamRecord) throws Exception {
            this.output.collect(streamRecord);
        }

        public void processWatermark(Watermark watermark) {
            this.output.emitWatermark(new Watermark(watermark.getTimestamp() * 2));
        }
    }

    @Test
    public void testOpenCloseAndTimestamps() throws Exception {
        OneInputStreamTaskTestHarness oneInputStreamTaskTestHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        oneInputStreamTaskTestHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = oneInputStreamTaskTestHarness.getStreamConfig();
        streamConfig.setStreamOperator(new StreamMap(new TestOpenCloseMapFunction()));
        streamConfig.setOperatorID(new OperatorID());
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamTaskTestHarness.invoke();
        oneInputStreamTaskTestHarness.waitForTaskRunning();
        oneInputStreamTaskTestHarness.processElement(new StreamRecord("Hello", 0 + 1));
        oneInputStreamTaskTestHarness.processElement(new StreamRecord("Ciao", 0 + 2));
        concurrentLinkedQueue.add(new StreamRecord("Hello", 0 + 1));
        concurrentLinkedQueue.add(new StreamRecord("Ciao", 0 + 2));
        oneInputStreamTaskTestHarness.waitForInputProcessing();
        oneInputStreamTaskTestHarness.endInput();
        oneInputStreamTaskTestHarness.waitForTaskCompletion();
        Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseMapFunction.closeCalled);
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, oneInputStreamTaskTestHarness.getOutput());
    }

    @Test
    public void testWatermarkAndWatermarkStatusForwarding() throws Exception {
        OneInputStreamTaskTestHarness oneInputStreamTaskTestHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        oneInputStreamTaskTestHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = oneInputStreamTaskTestHarness.getStreamConfig();
        streamConfig.setStreamOperator(new StreamMap(new IdentityMap()));
        streamConfig.setOperatorID(new OperatorID());
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamTaskTestHarness.invoke();
        oneInputStreamTaskTestHarness.waitForTaskRunning();
        oneInputStreamTaskTestHarness.processElement(new Watermark(0L), 0, 0);
        oneInputStreamTaskTestHarness.processElement(new Watermark(0L), 0, 1);
        oneInputStreamTaskTestHarness.processElement(new Watermark(0L), 1, 0);
        oneInputStreamTaskTestHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, oneInputStreamTaskTestHarness.getOutput());
        oneInputStreamTaskTestHarness.processElement(new Watermark(0L), 1, 1);
        oneInputStreamTaskTestHarness.waitForInputProcessing();
        concurrentLinkedQueue.add(new Watermark(0L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, oneInputStreamTaskTestHarness.getOutput());
        oneInputStreamTaskTestHarness.processElement(new StreamRecord("Hello", 0L));
        oneInputStreamTaskTestHarness.processElement(new StreamRecord("Ciao", 0L));
        concurrentLinkedQueue.add(new StreamRecord("Hello", 0L));
        concurrentLinkedQueue.add(new StreamRecord("Ciao", 0L));
        oneInputStreamTaskTestHarness.processElement(new Watermark(0 + 4), 0, 0);
        oneInputStreamTaskTestHarness.processElement(new Watermark(0 + 3), 0, 1);
        oneInputStreamTaskTestHarness.processElement(new Watermark(0 + 3), 1, 0);
        oneInputStreamTaskTestHarness.processElement(new Watermark(0 + 2), 1, 1);
        oneInputStreamTaskTestHarness.waitForInputProcessing();
        concurrentLinkedQueue.add(new Watermark(0 + 2));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, oneInputStreamTaskTestHarness.getOutput());
        oneInputStreamTaskTestHarness.processElement(new Watermark(0 + 4), 1, 1);
        oneInputStreamTaskTestHarness.waitForInputProcessing();
        concurrentLinkedQueue.add(new Watermark(0 + 3));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, oneInputStreamTaskTestHarness.getOutput());
        oneInputStreamTaskTestHarness.processElement(new Watermark(0 + 4), 0, 1);
        oneInputStreamTaskTestHarness.processElement(new Watermark(0 + 4), 1, 0);
        oneInputStreamTaskTestHarness.waitForInputProcessing();
        concurrentLinkedQueue.add(new Watermark(0 + 4));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, oneInputStreamTaskTestHarness.getOutput());
        oneInputStreamTaskTestHarness.processElement(WatermarkStatus.IDLE, 0, 1);
        oneInputStreamTaskTestHarness.processElement(WatermarkStatus.IDLE, 1, 0);
        oneInputStreamTaskTestHarness.processElement(new Watermark(0 + 6), 0, 0);
        oneInputStreamTaskTestHarness.processElement(new Watermark(0 + 5), 1, 1);
        oneInputStreamTaskTestHarness.processElement(WatermarkStatus.IDLE, 1, 1);
        oneInputStreamTaskTestHarness.waitForInputProcessing();
        concurrentLinkedQueue.add(new Watermark(0 + 5));
        concurrentLinkedQueue.add(new Watermark(0 + 6));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, oneInputStreamTaskTestHarness.getOutput());
        oneInputStreamTaskTestHarness.processElement(WatermarkStatus.IDLE, 0, 0);
        oneInputStreamTaskTestHarness.waitForInputProcessing();
        concurrentLinkedQueue.add(WatermarkStatus.IDLE);
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, oneInputStreamTaskTestHarness.getOutput());
        oneInputStreamTaskTestHarness.processElement(WatermarkStatus.ACTIVE, 1, 0);
        oneInputStreamTaskTestHarness.processElement(WatermarkStatus.ACTIVE, 0, 1);
        oneInputStreamTaskTestHarness.waitForInputProcessing();
        concurrentLinkedQueue.add(WatermarkStatus.ACTIVE);
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, oneInputStreamTaskTestHarness.getOutput());
        oneInputStreamTaskTestHarness.endInput();
        oneInputStreamTaskTestHarness.waitForTaskCompletion();
        Assert.assertEquals(2L, TestHarnessUtil.getRawElementsFromOutput(oneInputStreamTaskTestHarness.getOutput()).size());
    }

    @Test
    public void testWatermarksNotForwardedWithinChainWhenIdle() throws Exception {
        OneInputStreamTaskTestHarness oneInputStreamTaskTestHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, 1, 1, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        oneInputStreamTaskTestHarness.setupOperatorChain(new OperatorID(42L, 42L), (StreamOperator<?>) new TriggerableFailOnWatermarkTestOperator()).chain(new OperatorID(4711L, 42L), (OneInputStreamOperator) new WatermarkGeneratingTestOperator(), (TypeSerializer) StringSerializer.INSTANCE).chain(new OperatorID(123L, 123L), (OneInputStreamOperator) new TriggerableFailOnWatermarkTestOperator(), (TypeSerializer) StringSerializer.INSTANCE).finish();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamTaskTestHarness.invoke();
        oneInputStreamTaskTestHarness.waitForTaskRunning();
        oneInputStreamTaskTestHarness.processElement(new StreamRecord(TriggerableFailOnWatermarkTestOperator.EXPECT_FORWARDED_WATERMARKS_MARKER));
        oneInputStreamTaskTestHarness.processElement(new StreamRecord("10"), 0, 0);
        oneInputStreamTaskTestHarness.processElement(new Watermark(15L));
        oneInputStreamTaskTestHarness.processElement(new StreamRecord("20"), 0, 0);
        oneInputStreamTaskTestHarness.processElement(new StreamRecord("30"), 0, 0);
        oneInputStreamTaskTestHarness.waitForInputProcessing();
        concurrentLinkedQueue.add(new StreamRecord(TriggerableFailOnWatermarkTestOperator.EXPECT_FORWARDED_WATERMARKS_MARKER));
        concurrentLinkedQueue.add(new StreamRecord("10"));
        concurrentLinkedQueue.add(new Watermark(10L));
        concurrentLinkedQueue.add(new StreamRecord("20"));
        concurrentLinkedQueue.add(new Watermark(20L));
        concurrentLinkedQueue.add(new StreamRecord("30"));
        concurrentLinkedQueue.add(new Watermark(30L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, oneInputStreamTaskTestHarness.getOutput());
        oneInputStreamTaskTestHarness.processElement(WatermarkStatus.IDLE);
        oneInputStreamTaskTestHarness.processElement(new StreamRecord(TriggerableFailOnWatermarkTestOperator.NO_FORWARDED_WATERMARKS_MARKER));
        oneInputStreamTaskTestHarness.processElement(new StreamRecord("40"), 0, 0);
        oneInputStreamTaskTestHarness.processElement(new StreamRecord("50"), 0, 0);
        oneInputStreamTaskTestHarness.processElement(new StreamRecord("60"), 0, 0);
        oneInputStreamTaskTestHarness.processElement(new Watermark(65L));
        oneInputStreamTaskTestHarness.waitForInputProcessing();
        concurrentLinkedQueue.add(WatermarkStatus.IDLE);
        concurrentLinkedQueue.add(new StreamRecord(TriggerableFailOnWatermarkTestOperator.NO_FORWARDED_WATERMARKS_MARKER));
        concurrentLinkedQueue.add(new StreamRecord("40"));
        concurrentLinkedQueue.add(new StreamRecord("50"));
        concurrentLinkedQueue.add(new StreamRecord("60"));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, oneInputStreamTaskTestHarness.getOutput());
        oneInputStreamTaskTestHarness.processElement(WatermarkStatus.ACTIVE);
        oneInputStreamTaskTestHarness.processElement(new StreamRecord(TriggerableFailOnWatermarkTestOperator.EXPECT_FORWARDED_WATERMARKS_MARKER));
        oneInputStreamTaskTestHarness.processElement(new StreamRecord("70"), 0, 0);
        oneInputStreamTaskTestHarness.processElement(new StreamRecord("80"), 0, 0);
        oneInputStreamTaskTestHarness.processElement(new StreamRecord("90"), 0, 0);
        oneInputStreamTaskTestHarness.waitForInputProcessing();
        concurrentLinkedQueue.add(WatermarkStatus.ACTIVE);
        concurrentLinkedQueue.add(new StreamRecord(TriggerableFailOnWatermarkTestOperator.EXPECT_FORWARDED_WATERMARKS_MARKER));
        concurrentLinkedQueue.add(new StreamRecord("70"));
        concurrentLinkedQueue.add(new Watermark(70L));
        concurrentLinkedQueue.add(new StreamRecord("80"));
        concurrentLinkedQueue.add(new Watermark(80L));
        concurrentLinkedQueue.add(new StreamRecord("90"));
        concurrentLinkedQueue.add(new Watermark(90L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, oneInputStreamTaskTestHarness.getOutput());
        oneInputStreamTaskTestHarness.endInput();
        oneInputStreamTaskTestHarness.waitForTaskCompletion();
        Assert.assertEquals(12L, TestHarnessUtil.getRawElementsFromOutput(oneInputStreamTaskTestHarness.getOutput()).size());
    }

    @Test
    public void testCheckpointBarriers() throws Exception {
        OneInputStreamTaskTestHarness oneInputStreamTaskTestHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        oneInputStreamTaskTestHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = oneInputStreamTaskTestHarness.getStreamConfig();
        streamConfig.setStreamOperator(new StreamMap(new IdentityMap()));
        streamConfig.setOperatorID(new OperatorID());
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamTaskTestHarness.invoke();
        oneInputStreamTaskTestHarness.waitForTaskRunning();
        oneInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);
        oneInputStreamTaskTestHarness.processElement(new StreamRecord("Hello-1-1", 0L), 1, 1);
        oneInputStreamTaskTestHarness.processElement(new StreamRecord("Ciao-1-1", 0L), 1, 1);
        concurrentLinkedQueue.add(new StreamRecord("Hello-1-1", 0L));
        concurrentLinkedQueue.add(new StreamRecord("Ciao-1-1", 0L));
        oneInputStreamTaskTestHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, oneInputStreamTaskTestHarness.getOutput());
        oneInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 1);
        oneInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
        oneInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
        oneInputStreamTaskTestHarness.waitForInputProcessing();
        concurrentLinkedQueue.add(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()));
        oneInputStreamTaskTestHarness.endInput();
        oneInputStreamTaskTestHarness.waitForTaskCompletion();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, oneInputStreamTaskTestHarness.getOutput());
    }

    @Test
    public void testOvertakingCheckpointBarriers() throws Exception {
        OneInputStreamTaskTestHarness oneInputStreamTaskTestHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        oneInputStreamTaskTestHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = oneInputStreamTaskTestHarness.getStreamConfig();
        streamConfig.setStreamOperator(new StreamMap(new IdentityMap()));
        streamConfig.setOperatorID(new OperatorID());
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamTaskTestHarness.invoke();
        oneInputStreamTaskTestHarness.waitForTaskRunning();
        oneInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);
        oneInputStreamTaskTestHarness.processElement(new StreamRecord("Hello-1-1", 0L), 1, 1);
        oneInputStreamTaskTestHarness.processElement(new StreamRecord("Ciao-1-1", 0L), 1, 1);
        concurrentLinkedQueue.add(new StreamRecord("Hello-1-1", 0L));
        concurrentLinkedQueue.add(new StreamRecord("Ciao-1-1", 0L));
        oneInputStreamTaskTestHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, oneInputStreamTaskTestHarness.getOutput());
        oneInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 1);
        oneInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);
        oneInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
        oneInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
        concurrentLinkedQueue.add(new CancelCheckpointMarker(0L));
        concurrentLinkedQueue.add(new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()));
        oneInputStreamTaskTestHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, oneInputStreamTaskTestHarness.getOutput());
        oneInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 1);
        oneInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
        oneInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
        oneInputStreamTaskTestHarness.waitForInputProcessing();
        oneInputStreamTaskTestHarness.endInput();
        oneInputStreamTaskTestHarness.waitForTaskCompletion();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, oneInputStreamTaskTestHarness.getOutput());
    }

    @Test
    public void testSnapshottingAndRestoring() throws Exception {
        Deadline fromNow = Deadline.fromNow(Duration.ofMinutes(2L));
        OneInputStreamTaskTestHarness oneInputStreamTaskTestHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        oneInputStreamTaskTestHarness.setupOutputForSingletonOperatorChain();
        IdentityKeySelector identityKeySelector = new IdentityKeySelector();
        oneInputStreamTaskTestHarness.configureForKeyedStream(identityKeySelector, BasicTypeInfo.STRING_TYPE_INFO);
        configureChainedTestingStreamOperator(oneInputStreamTaskTestHarness.getStreamConfig(), 11);
        TestTaskStateManager testTaskStateManager = oneInputStreamTaskTestHarness.taskStateManager;
        TestingStreamOperator.numberRestoreCalls = 0;
        oneInputStreamTaskTestHarness.invoke();
        oneInputStreamTaskTestHarness.waitForTaskRunning();
        oneInputStreamTaskTestHarness.mo166getTask().triggerCheckpointAsync(new CheckpointMetaData(1L, 1L), CheckpointOptions.forCheckpointWithDefaultLocation()).get();
        Assert.assertEquals(0L, TestingStreamOperator.numberRestoreCalls);
        testTaskStateManager.getWaitForReportLatch().await();
        Assert.assertEquals(1L, testTaskStateManager.getReportedCheckpointId());
        oneInputStreamTaskTestHarness.endInput();
        oneInputStreamTaskTestHarness.waitForTaskCompletion(fromNow.timeLeft().toMillis());
        OneInputStreamTaskTestHarness oneInputStreamTaskTestHarness2 = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        oneInputStreamTaskTestHarness2.configureForKeyedStream(identityKeySelector, BasicTypeInfo.STRING_TYPE_INFO);
        oneInputStreamTaskTestHarness2.setTaskStateSnapshot(1L, testTaskStateManager.getLastJobManagerTaskStateSnapshot());
        configureChainedTestingStreamOperator(oneInputStreamTaskTestHarness2.getStreamConfig(), 11);
        Assert.assertEquals(11, testTaskStateManager.getLastJobManagerTaskStateSnapshot().getSubtaskStateMappings().size());
        TestingStreamOperator.numberRestoreCalls = 0;
        oneInputStreamTaskTestHarness2.taskStateManager.restoreLatestCheckpointState(testTaskStateManager.getJobManagerTaskStateSnapshotsByCheckpointId());
        oneInputStreamTaskTestHarness2.invoke();
        oneInputStreamTaskTestHarness2.endInput();
        oneInputStreamTaskTestHarness2.waitForTaskCompletion(fromNow.timeLeft().toMillis());
        Assert.assertEquals(11, TestingStreamOperator.numberRestoreCalls);
        TestingStreamOperator.numberRestoreCalls = 0;
        TestingStreamOperator.numberSnapshotCalls = 0;
    }

    @Test
    public void testQuiesceTimerServiceAfterOpClose() throws Exception {
        OneInputStreamTaskTestHarness oneInputStreamTaskTestHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        oneInputStreamTaskTestHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = oneInputStreamTaskTestHarness.getStreamConfig();
        streamConfig.setStreamOperator(new TestOperator());
        streamConfig.setOperatorID(new OperatorID());
        oneInputStreamTaskTestHarness.invoke();
        oneInputStreamTaskTestHarness.waitForTaskRunning();
        SystemProcessingTimeService timerService = oneInputStreamTaskTestHarness.getTimerService();
        Assert.assertTrue(timerService.isAlive());
        oneInputStreamTaskTestHarness.endInput();
        oneInputStreamTaskTestHarness.waitForTaskCompletion();
        timerService.shutdownService();
    }

    @Test
    public void testClosingAllOperatorsOnChainProperly() throws Exception {
        OneInputStreamTaskTestHarness oneInputStreamTaskTestHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        oneInputStreamTaskTestHarness.setupOperatorChain(new OperatorID(), (StreamOperator<?>) new TestBoundedOneInputStreamOperator("Operator0")).chain(new OperatorID(), new TestBoundedOneInputStreamOperator("Operator1"), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new SerializerConfigImpl())).finish();
        oneInputStreamTaskTestHarness.invoke();
        oneInputStreamTaskTestHarness.waitForTaskRunning();
        oneInputStreamTaskTestHarness.processElement(new StreamRecord("Hello"));
        oneInputStreamTaskTestHarness.endInput();
        oneInputStreamTaskTestHarness.waitForTaskCompletion();
        ArrayDeque arrayDeque = new ArrayDeque();
        Collections.addAll(arrayDeque, new StreamRecord("Hello"), new StreamRecord("[Operator0]: End of input"), new StreamRecord("[Operator0]: Finish"), new StreamRecord("[Operator1]: End of input"), new StreamRecord("[Operator1]: Finish"));
        MatcherAssert.assertThat(oneInputStreamTaskTestHarness.getOutput(), Matchers.containsInAnyOrder(arrayDeque.toArray()));
    }

    @Test
    public void testOperatorMetricReuse() throws Exception {
        OneInputStreamTaskTestHarness oneInputStreamTaskTestHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        oneInputStreamTaskTestHarness.setupOperatorChain(new OperatorID(), (StreamOperator<?>) new DuplicatingOperator()).chain(new OperatorID(), new DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new SerializerConfigImpl())).chain(new OperatorID(), new DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new SerializerConfigImpl())).finish();
        final TaskMetricGroup addTask = TaskManagerMetricGroup.createTaskManagerMetricGroup(NoOpMetricRegistry.INSTANCE, "host", ResourceID.generate()).addJob(new JobID(), "jobname").addTask(ExecutionGraphTestUtils.createExecutionAttemptId(), "task");
        StreamMockEnvironment streamMockEnvironment = new StreamMockEnvironment(oneInputStreamTaskTestHarness.jobConfig, oneInputStreamTaskTestHarness.taskConfig, oneInputStreamTaskTestHarness.memorySize, new MockInputSplitProvider(), oneInputStreamTaskTestHarness.bufferSize, new TestTaskStateManager()) { // from class: org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTest.1
            @Override // org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment
            public TaskMetricGroup getMetricGroup() {
                return addTask;
            }
        };
        Counter numRecordsInCounter = addTask.getIOMetricGroup().getNumRecordsInCounter();
        Counter numRecordsOutCounter = addTask.getIOMetricGroup().getNumRecordsOutCounter();
        oneInputStreamTaskTestHarness.invoke(streamMockEnvironment);
        oneInputStreamTaskTestHarness.waitForTaskRunning();
        for (int i = 0; i < 5; i++) {
            oneInputStreamTaskTestHarness.processElement(new StreamRecord("hello"));
        }
        oneInputStreamTaskTestHarness.waitForInputProcessing();
        Assert.assertEquals(5L, numRecordsInCounter.getCount());
        Assert.assertEquals(40L, numRecordsOutCounter.getCount());
        oneInputStreamTaskTestHarness.endInput();
        oneInputStreamTaskTestHarness.waitForTaskCompletion();
    }

    @Test
    public void testWatermarkMetrics() throws Exception {
        OneInputStreamTaskTestHarness oneInputStreamTaskTestHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        WatermarkMetricOperator watermarkMetricOperator = new WatermarkMetricOperator();
        final OperatorID operatorID = new OperatorID();
        WatermarkMetricOperator watermarkMetricOperator2 = new WatermarkMetricOperator();
        final OperatorID operatorID2 = new OperatorID();
        oneInputStreamTaskTestHarness.setupOperatorChain(operatorID, (StreamOperator<?>) watermarkMetricOperator).chain(operatorID2, watermarkMetricOperator2, BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new SerializerConfigImpl())).finish();
        final InterceptingOperatorMetricGroup interceptingOperatorMetricGroup = new InterceptingOperatorMetricGroup();
        final InterceptingOperatorMetricGroup interceptingOperatorMetricGroup2 = new InterceptingOperatorMetricGroup();
        final InterceptingTaskMetricGroup interceptingTaskMetricGroup = new InterceptingTaskMetricGroup() { // from class: org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTest.2
            public InternalOperatorMetricGroup getOrAddOperator(OperatorID operatorID3, String str) {
                return operatorID3.equals(operatorID) ? interceptingOperatorMetricGroup : operatorID3.equals(operatorID2) ? interceptingOperatorMetricGroup2 : super.getOrAddOperator(operatorID3, str);
            }
        };
        oneInputStreamTaskTestHarness.invoke(new StreamMockEnvironment(oneInputStreamTaskTestHarness.jobConfig, oneInputStreamTaskTestHarness.taskConfig, oneInputStreamTaskTestHarness.memorySize, new MockInputSplitProvider(), oneInputStreamTaskTestHarness.bufferSize, new TestTaskStateManager()) { // from class: org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTest.3
            @Override // org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment
            public TaskMetricGroup getMetricGroup() {
                return interceptingTaskMetricGroup;
            }
        });
        oneInputStreamTaskTestHarness.waitForTaskRunning();
        Gauge gauge = interceptingTaskMetricGroup.get("currentInputWatermark");
        Gauge gauge2 = interceptingOperatorMetricGroup.get("currentInputWatermark");
        Gauge gauge3 = interceptingOperatorMetricGroup.get("currentOutputWatermark");
        Gauge gauge4 = interceptingOperatorMetricGroup2.get("currentInputWatermark");
        Gauge gauge5 = interceptingOperatorMetricGroup2.get("currentOutputWatermark");
        Assert.assertEquals("A metric was registered multiple times.", 5L, new HashSet(Arrays.asList(gauge, gauge2, gauge3, gauge4, gauge5)).size());
        Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge.getValue()).longValue());
        Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge2.getValue()).longValue());
        Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge3.getValue()).longValue());
        Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge4.getValue()).longValue());
        Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge5.getValue()).longValue());
        oneInputStreamTaskTestHarness.processElement(new Watermark(1L));
        oneInputStreamTaskTestHarness.waitForInputProcessing();
        Assert.assertEquals(1L, ((Long) gauge.getValue()).longValue());
        Assert.assertEquals(1L, ((Long) gauge2.getValue()).longValue());
        Assert.assertEquals(2L, ((Long) gauge3.getValue()).longValue());
        Assert.assertEquals(2L, ((Long) gauge4.getValue()).longValue());
        Assert.assertEquals(4L, ((Long) gauge5.getValue()).longValue());
        oneInputStreamTaskTestHarness.processElement(new Watermark(2L));
        oneInputStreamTaskTestHarness.waitForInputProcessing();
        Assert.assertEquals(2L, ((Long) gauge.getValue()).longValue());
        Assert.assertEquals(2L, ((Long) gauge2.getValue()).longValue());
        Assert.assertEquals(4L, ((Long) gauge3.getValue()).longValue());
        Assert.assertEquals(4L, ((Long) gauge4.getValue()).longValue());
        Assert.assertEquals(8L, ((Long) gauge5.getValue()).longValue());
        oneInputStreamTaskTestHarness.endInput();
        oneInputStreamTaskTestHarness.waitForTaskCompletion();
    }

    @Test
    public void testCheckpointBarrierMetrics() throws Exception {
        OneInputStreamTaskTestHarness oneInputStreamTaskTestHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        oneInputStreamTaskTestHarness.setupOutputForSingletonOperatorChain();
        oneInputStreamTaskTestHarness.getStreamConfig().setStreamOperator(new TestOperator());
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        TaskMetricGroup createTaskMetricGroup = StreamTaskTestHarness.createTaskMetricGroup(concurrentHashMap);
        StreamMockEnvironment createEnvironment = oneInputStreamTaskTestHarness.createEnvironment();
        createEnvironment.setTaskMetricGroup(createTaskMetricGroup);
        oneInputStreamTaskTestHarness.invoke(createEnvironment);
        oneInputStreamTaskTestHarness.waitForTaskRunning();
        MatcherAssert.assertThat(concurrentHashMap, IsMapContaining.hasKey("checkpointAlignmentTime"));
        MatcherAssert.assertThat(concurrentHashMap, IsMapContaining.hasKey("checkpointStartDelayNanos"));
        oneInputStreamTaskTestHarness.endInput();
        oneInputStreamTaskTestHarness.waitForTaskCompletion();
    }

    @Test
    public void testCanEmitBatchOfRecords() throws Exception {
        AvailabilityProvider.AvailabilityHelper availabilityHelper = new AvailabilityProvider.AvailabilityHelper();
        StreamTaskMailboxTestHarness build = ((StreamTaskMailboxTestHarnessBuilder) new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).addInput(BasicTypeInfo.INT_TYPE_INFO).addAdditionalOutput(new ResultPartitionWriterWithAvailabilityHelper(availabilityHelper)).setupOperatorChain((StreamOperator<?>) new TestOperator()).finishForSingletonOperatorChain(IntSerializer.INSTANCE)).build();
        Throwable th = null;
        try {
            try {
                StreamTask.CanEmitBatchOfRecordsChecker canEmitBatchOfRecords = build.streamTask.getCanEmitBatchOfRecords();
                build.processAll();
                availabilityHelper.resetAvailable();
                Assert.assertTrue(canEmitBatchOfRecords.check());
                availabilityHelper.resetUnavailable();
                Assert.assertFalse(canEmitBatchOfRecords.check());
                availabilityHelper.resetAvailable();
                Assert.assertTrue(canEmitBatchOfRecords.check());
                build.streamTask.mainMailboxExecutor.execute(() -> {
                }, "mail");
                Assert.assertFalse(canEmitBatchOfRecords.check());
                build.processAll();
                Assert.assertTrue(canEmitBatchOfRecords.check());
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testTaskSideOutputStatistics() throws Exception {
        TaskMetricGroup createUnregisteredTaskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
        ResultPartitionWriter[] resultPartitionWriterArr = new ResultPartitionWriter[3];
        for (int i = 0; i < resultPartitionWriterArr.length; i++) {
            resultPartitionWriterArr[i] = new RecordOrEventCollectingResultPartitionWriter(new ArrayDeque(), new StreamElementSerializer(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new SerializerConfigImpl())));
            resultPartitionWriterArr[i].setup();
        }
        try {
            StreamTaskMailboxTestHarness build = ((StreamTaskMailboxTestHarnessBuilder) new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).addInput(BasicTypeInfo.INT_TYPE_INFO).addAdditionalOutput(resultPartitionWriterArr).setupOperatorChain(new OperatorID(), (StreamOperator<?>) new PassThroughOperator()).chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new SerializerConfigImpl())).setOperatorFactory(SimpleOperatorFactory.of(new OddEvenOperator())).addNonChainedOutputsCount(new OutputTag<>("odd", BasicTypeInfo.INT_TYPE_INFO), 2).addNonChainedOutputsCount(1).build().chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer(new SerializerConfigImpl())).setOperatorFactory(SimpleOperatorFactory.of(new DuplicatingOperator())).addNonChainedOutputsCount(1).build().finish()).setTaskMetricGroup(createUnregisteredTaskMetricGroup).build();
            Throwable th = null;
            try {
                try {
                    Counter numRecordsInCounter = createUnregisteredTaskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
                    Counter numRecordsOutCounter = createUnregisteredTaskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
                    for (int i2 = 0; i2 < 5; i2++) {
                        build.processElement(new StreamRecord(Integer.valueOf(2 * i2)));
                    }
                    for (int i3 = 0; i3 < 3; i3++) {
                        build.processElement(new StreamRecord(Integer.valueOf((2 * i3) + 1)));
                    }
                    Assert.assertEquals(8L, numRecordsInCounter.getCount());
                    Assert.assertEquals(27L, numRecordsOutCounter.getCount());
                    build.waitForTaskCompletion();
                    if (build != null) {
                        if (0 != 0) {
                            try {
                                build.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            build.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } finally {
            for (ResultPartitionWriter resultPartitionWriter : resultPartitionWriterArr) {
                resultPartitionWriter.close();
            }
        }
    }

    private void configureChainedTestingStreamOperator(StreamConfig streamConfig, int i) {
        Preconditions.checkArgument(i >= 1, "The operator chain must at least contain one operator.");
        streamConfig.setStreamOperator(new TestingStreamOperator());
        streamConfig.setOperatorID(new OperatorID(0L, 0L));
        HashMap hashMap = new HashMap(i - 1);
        ArrayList arrayList = new ArrayList(i - 1);
        for (int i2 = 1; i2 < i; i2++) {
            TestingStreamOperator testingStreamOperator = new TestingStreamOperator();
            StreamConfig streamConfig2 = new StreamConfig(new Configuration());
            streamConfig2.setupNetworkInputs(new TypeSerializer[]{StringSerializer.INSTANCE});
            streamConfig2.setStreamOperator(testingStreamOperator);
            streamConfig2.setOperatorID(new OperatorID(0L, i2));
            hashMap.put(Integer.valueOf(i2), streamConfig2);
            arrayList.add(new StreamEdge(new StreamNode(Integer.valueOf(i2 - 1), (String) null, (String) null, (StreamOperator) null, (String) null, (Class) null), new StreamNode(Integer.valueOf(i2), (String) null, (String) null, (StreamOperator) null, (String) null, (Class) null), 0, (StreamPartitioner) null, (OutputTag) null));
        }
        streamConfig.setChainedOutputs(arrayList);
        hashMap.values().forEach((v0) -> {
            v0.serializeAllConfigs();
        });
        streamConfig.setAndSerializeTransitiveChainedTaskConfigs(hashMap);
    }
}
