/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.mocks.MockSource;
import org.apache.flink.api.connector.source.mocks.MockSourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriterWithAvailabilityHelper;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
import org.apache.flink.runtime.metrics.util.InterceptingTaskMetricGroup;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractInput;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTaskChainedSourcesCheckpointingTest;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTest;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskFinalCheckpointsTest;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.TestFinishedOnRestoreStreamOperator;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.CompletingCheckpointResponder;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.testutils.junit.SharedObjectsExtension;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.MapAssert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;

@ExtendWith(value={ParameterizedTestExtension.class})
class MultipleInputStreamTaskTest {
    private static final List<String> LIFE_CYCLE_EVENTS = new ArrayList<String>();
    @Parameter
    private boolean objectReuse;
    @RegisterExtension
    private final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();

    MultipleInputStreamTaskTest() {
    }

    @Parameters(name="objectReuse = {0}")
    private static Collection<Boolean> parameters() {
        return Arrays.asList(true, false);
    }

    @BeforeEach
    void setUp() {
        LIFE_CYCLE_EVENTS.clear();
    }

    @TestTemplate
    void testBasicProcessing() throws Exception {
        try (StreamTaskMailboxTestHarness<String> testHarness = MultipleInputStreamTaskTest.buildTestHarness(this.objectReuse);){
            long initialTime = 0L;
            ArrayDeque<StreamRecord> expectedOutput = new ArrayDeque<StreamRecord>();
            MultipleInputStreamTaskTest.addSourceRecords(testHarness, 1, 42, 43);
            expectedOutput.add(new StreamRecord((Object)"42", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"43", Long.MIN_VALUE));
            testHarness.processElement(new StreamRecord((Object)"Hello", initialTime + 1L), 0);
            expectedOutput.add(new StreamRecord((Object)"Hello", initialTime + 1L));
            testHarness.processElement(new StreamRecord((Object)42.44, initialTime + 3L), 1);
            expectedOutput.add(new StreamRecord((Object)"42.44", initialTime + 3L));
            testHarness.endInput();
            testHarness.waitForTaskCompletion();
            Assertions.assertThat(testHarness.getOutput()).containsExactlyInAnyOrderElementsOf(expectedOutput);
        }
    }

    @TestTemplate
    void testCopyForObjectReuse() throws Exception {
        SharedReference copiedElementsRef = this.sharedObjects.add(new ArrayList());
        CopyProxySerializer proxySerializer = new CopyProxySerializer((SharedReference<List<Integer>>)copiedElementsRef);
        try (StreamTaskMailboxTestHarness<String> testHarness = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).modifyExecutionConfig(MultipleInputStreamTaskTest.applyObjectReuse(this.objectReuse)).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addSourceInput(new SourceOperatorFactory((Source)new MockSource(Boundedness.BOUNDED, 1), WatermarkStrategy.noWatermarks()), proxySerializer).addInput((TypeInformation<?>)BasicTypeInfo.DOUBLE_TYPE_INFO).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)new MapToStringMultipleInputOperatorFactory(3)).build();){
            MultipleInputStreamTaskTest.addSourceRecords(testHarness, 1, 42, 43);
            testHarness.endInput();
            testHarness.waitForTaskCompletion();
            if (this.objectReuse) {
                Assertions.assertThat((List)((List)copiedElementsRef.get())).isEmpty();
            } else {
                Assertions.assertThat((List)((List)copiedElementsRef.get())).containsExactlyInAnyOrder((Object[])new Integer[]{42, 43});
            }
        }
    }

    @TestTemplate
    void testCheckpointBarriers() throws Exception {
        try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO, 2).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO, 2).addInput((TypeInformation<?>)BasicTypeInfo.DOUBLE_TYPE_INFO, 2).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)new MapToStringMultipleInputOperatorFactory(3)).build();){
            ArrayDeque<Object> expectedOutput = new ArrayDeque<Object>();
            long initialTime = 0L;
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);
            testHarness.processElement(new StreamRecord((Object)"Ciao-0-0", initialTime), 0, 1);
            expectedOutput.add(new StreamRecord((Object)"Ciao-0-0", initialTime));
            testHarness.processElement(new StreamRecord((Object)11, initialTime), 1, 1);
            testHarness.processElement(new StreamRecord((Object)1.0, initialTime), 2, 0);
            expectedOutput.add(new StreamRecord((Object)"11", initialTime));
            expectedOutput.add(new StreamRecord((Object)"1.0", initialTime));
            Assertions.assertThat(testHarness.getOutput()).containsExactlyElementsOf(expectedOutput);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 1);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 2, 0);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 2, 1);
            expectedOutput.add(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()));
            Assertions.assertThat(testHarness.getOutput()).containsExactlyElementsOf(expectedOutput);
        }
    }

    @TestTemplate
    void testOvertakingCheckpointBarriers() throws Exception {
        try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO, 2).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO, 2).addInput((TypeInformation<?>)BasicTypeInfo.DOUBLE_TYPE_INFO, 2).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)new MapToStringMultipleInputOperatorFactory(3)).build();){
            ArrayDeque<Object> expectedOutput = new ArrayDeque<Object>();
            long initialTime = 0L;
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);
            testHarness.processElement(new StreamRecord((Object)"Witam-0-1", initialTime), 0, 1);
            testHarness.processElement(new StreamRecord((Object)42, initialTime), 1, 1);
            testHarness.processElement(new StreamRecord((Object)1.0, initialTime), 2, 1);
            expectedOutput.add(new StreamRecord((Object)"Witam-0-1", initialTime));
            expectedOutput.add(new StreamRecord((Object)"42", initialTime));
            expectedOutput.add(new StreamRecord((Object)"1.0", initialTime));
            Assertions.assertThat(testHarness.getOutput()).containsExactlyElementsOf(expectedOutput);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 1);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 2, 0);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 2, 1);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);
            expectedOutput.add(new CancelCheckpointMarker(0L));
            expectedOutput.add(new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()));
            Assertions.assertThat(testHarness.getOutput()).containsExactlyElementsOf(expectedOutput);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 1);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 2, 0);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 2, 1);
            testHarness.waitForTaskCompletion();
            Assertions.assertThat(testHarness.getOutput()).containsExactlyElementsOf(expectedOutput);
        }
    }

    @TestTemplate
    void testMetrics() throws Exception {
        final HashMap operatorMetrics = new HashMap();
        UnregisteredMetricGroups.UnregisteredTaskMetricGroup taskMetricGroup = new UnregisteredMetricGroups.UnregisteredTaskMetricGroup(){

            public InternalOperatorMetricGroup getOrAddOperator(OperatorID operatorID, String name) {
                InternalOperatorMetricGroup operatorMetricGroup = super.getOrAddOperator(operatorID, name);
                operatorMetrics.put(name, operatorMetricGroup);
                return operatorMetricGroup;
            }
        };
        String mainOperatorName = "MainOperator";
        try (StreamTaskMailboxTestHarness<String> testHarness = ((StreamTaskMailboxTestHarnessBuilder)new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).modifyExecutionConfig(MultipleInputStreamTaskTest.applyObjectReuse(this.objectReuse)).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addSourceInput(new SourceOperatorFactory((Source)new LifeCycleTrackingMockSource(Boundedness.BOUNDED, 1), WatermarkStrategy.noWatermarks()), BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).setupOperatorChain((StreamOperatorFactory<?>)new MapToStringMultipleInputOperatorFactory(3)).name(mainOperatorName).chain((OneInputStreamOperator)new OneInputStreamTaskTest.DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl())).chain((OneInputStreamOperator)new OneInputStreamTaskTest.DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl())).chain((OneInputStreamOperator)new OneInputStreamTaskTest.DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl())).finish()).setTaskMetricGroup((TaskMetricGroup)taskMetricGroup).build();){
            int x;
            Assertions.assertThat(operatorMetrics).containsKey((Object)mainOperatorName);
            OperatorMetricGroup mainOperatorMetrics = (OperatorMetricGroup)operatorMetrics.get(mainOperatorName);
            Counter numRecordsInCounter = taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
            Counter numRecordsOutCounter = taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
            int numRecords1 = 5;
            int numRecords2 = 3;
            int numRecords3 = 2;
            MultipleInputStreamTaskTest.addSourceRecords(testHarness, 1, 42, 43, 44);
            for (x = 0; x < numRecords1; ++x) {
                testHarness.processElement(new StreamRecord((Object)"hello"), 0, 0);
            }
            for (x = 0; x < numRecords3; ++x) {
                testHarness.processElement(new StreamRecord((Object)"hello"), 1, 0);
            }
            int networkRecordsIn = numRecords1 + numRecords3;
            int mainOperatorRecordsIn = networkRecordsIn + numRecords2;
            int totalRecordsOut = mainOperatorRecordsIn * 2 * 2 * 2;
            Assertions.assertThat((long)mainOperatorMetrics.getIOMetricGroup().getNumRecordsInCounter().getCount()).isEqualTo((long)mainOperatorRecordsIn);
            Assertions.assertThat((long)numRecordsInCounter.getCount()).isEqualTo((long)networkRecordsIn);
            Assertions.assertThat((long)numRecordsOutCounter.getCount()).isEqualTo((long)totalRecordsOut);
            testHarness.waitForTaskCompletion();
        }
    }

    @TestTemplate
    void testLifeCycleOrder() throws Exception {
        try (StreamTaskMailboxTestHarness testHarness = ((StreamTaskMailboxTestHarnessBuilder)new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).modifyExecutionConfig(MultipleInputStreamTaskTest.applyObjectReuse(this.objectReuse)).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addSourceInput(new SourceOperatorFactory((Source)new LifeCycleTrackingMockSource(Boundedness.BOUNDED, 1), WatermarkStrategy.noWatermarks()), BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.DOUBLE_TYPE_INFO).setupOperatorChain((StreamOperatorFactory<?>)new LifeCycleTrackingMapToStringMultipleInputOperatorFactory()).chain(new LifeCycleTrackingMap(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl())).finish()).build();){
            testHarness.waitForTaskCompletion();
        }
        Assertions.assertThat(LIFE_CYCLE_EVENTS).contains((Object[])new String[]{"LifeCycleTrackingMap#open", "MultipleInputOperator#open", "SourceReader#start", "MultipleInputOperator#endInput", "MultipleInputOperator#endInput", "MultipleInputOperator#endInput", "MultipleInputOperator#finish", "LifeCycleTrackingMap#endInput", "LifeCycleTrackingMap#close", "MultipleInputOperator#close", "SourceReader#close"});
    }

    @TestTemplate
    void testInputFairness() throws Exception {
        try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)new MapToStringMultipleInputOperatorFactory(3)).build();){
            ArrayDeque<StreamRecord> expectedOutput = new ArrayDeque<StreamRecord>();
            testHarness.setAutoProcess(false);
            testHarness.processElement(new StreamRecord((Object)"0"), 0);
            testHarness.processElement(new StreamRecord((Object)"1"), 0);
            testHarness.processElement(new StreamRecord((Object)"2"), 0);
            testHarness.processElement(new StreamRecord((Object)"3"), 0);
            testHarness.processElement(new StreamRecord((Object)"0"), 2);
            testHarness.processElement(new StreamRecord((Object)"1"), 2);
            testHarness.processAll();
            expectedOutput.add(new StreamRecord((Object)"0"));
            expectedOutput.add(new StreamRecord((Object)"0"));
            expectedOutput.add(new StreamRecord((Object)"1"));
            expectedOutput.add(new StreamRecord((Object)"1"));
            expectedOutput.add(new StreamRecord((Object)"2"));
            expectedOutput.add(new StreamRecord((Object)"3"));
            Assertions.assertThat(testHarness.getOutput()).containsExactlyElementsOf(expectedOutput);
        }
    }

    @TestTemplate
    void testWatermark() throws Exception {
        try (StreamTaskMailboxTestHarness<String> testHarness = this.buildWatermarkTestHarness(2, false);){
            ArrayDeque<Object> expectedOutput = new ArrayDeque<Object>();
            int initialTime = 0;
            testHarness.processElement(new Watermark((long)initialTime), 0, 0);
            testHarness.processElement(new Watermark((long)initialTime), 0, 1);
            MultipleInputStreamTaskTest.addSourceRecords(testHarness, 1, initialTime);
            expectedOutput.add(new StreamRecord((Object)("" + initialTime), Long.MIN_VALUE));
            testHarness.processElement(new Watermark((long)initialTime), 1, 0);
            Assertions.assertThat(testHarness.getOutput()).containsExactlyElementsOf(expectedOutput);
            testHarness.processElement(new Watermark((long)initialTime), 1, 1);
            expectedOutput.add(new Watermark((long)initialTime));
            Assertions.assertThat(testHarness.getOutput()).containsExactlyElementsOf(expectedOutput);
            testHarness.processElement(new StreamRecord((Object)"Hello", (long)initialTime), 0, 0);
            testHarness.processElement(new StreamRecord((Object)42.0, (long)initialTime), 1, 1);
            expectedOutput.add(new StreamRecord((Object)"Hello", (long)initialTime));
            expectedOutput.add(new StreamRecord((Object)"42.0", (long)initialTime));
            Assertions.assertThat(testHarness.getOutput()).containsExactlyElementsOf(expectedOutput);
            testHarness.processElement(new Watermark((long)(initialTime + 4)), 0, 0);
            testHarness.processElement(new Watermark((long)(initialTime + 3)), 0, 1);
            MultipleInputStreamTaskTest.addSourceRecords(testHarness, 1, initialTime + 3);
            expectedOutput.add(new StreamRecord((Object)("" + (initialTime + 3)), Long.MIN_VALUE));
            testHarness.processElement(new Watermark((long)(initialTime + 3)), 1, 0);
            testHarness.processElement(new Watermark((long)(initialTime + 2)), 1, 1);
            expectedOutput.add(new Watermark((long)(initialTime + 2)));
            Assertions.assertThat(testHarness.getOutput()).containsExactlyElementsOf(expectedOutput);
            testHarness.processElement(new Watermark((long)(initialTime + 4)), 1, 1);
            expectedOutput.add(new Watermark((long)(initialTime + 3)));
            Assertions.assertThat(testHarness.getOutput()).containsExactlyElementsOf(expectedOutput);
            testHarness.processElement(new Watermark((long)(initialTime + 4)), 0, 1);
            MultipleInputStreamTaskTest.addSourceRecords(testHarness, 1, initialTime + 4);
            expectedOutput.add(new StreamRecord((Object)("" + (initialTime + 4)), Long.MIN_VALUE));
            testHarness.processElement(new Watermark((long)(initialTime + 4)), 1, 0);
            expectedOutput.add(new Watermark((long)(initialTime + 4)));
            Assertions.assertThat(testHarness.getOutput()).containsExactlyElementsOf(expectedOutput);
            List resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
            Assertions.assertThat((List)resultElements).hasSize(5);
        }
    }

    @TestTemplate
    void testWatermarkAndWatermarkStatusForwarding() throws Exception {
        try (StreamTaskMailboxTestHarness<String> testHarness = this.buildWatermarkTestHarness(2, true);){
            ArrayDeque<Object> expectedOutput = new ArrayDeque<Object>();
            int initialTime = 0;
            testHarness.processElement(WatermarkStatus.IDLE, 0, 1);
            testHarness.processElement(new Watermark((long)(initialTime + 6)), 0, 0);
            testHarness.processElement(new Watermark((long)(initialTime + 5)), 1, 1);
            testHarness.processElement(WatermarkStatus.IDLE, 1, 0);
            expectedOutput.add(new Watermark((long)(initialTime + 5)));
            Assertions.assertThat(testHarness.getOutput()).containsExactlyElementsOf(expectedOutput);
            testHarness.processElement(WatermarkStatus.IDLE, 1, 1);
            expectedOutput.add(new Watermark((long)(initialTime + 6)));
            Assertions.assertThat(testHarness.getOutput()).containsExactlyElementsOf(expectedOutput);
            testHarness.processElement(WatermarkStatus.IDLE, 0, 0);
            expectedOutput.add(WatermarkStatus.IDLE);
            Assertions.assertThat(testHarness.getOutput()).containsExactlyElementsOf(expectedOutput);
            MultipleInputStreamTaskTest.addSourceRecords(testHarness, 1, initialTime + 10);
            expectedOutput.add(WatermarkStatus.ACTIVE);
            expectedOutput.add(new StreamRecord((Object)("" + (initialTime + 10)), Long.MIN_VALUE));
            expectedOutput.add(new Watermark((long)(initialTime + 10)));
            expectedOutput.add(WatermarkStatus.IDLE);
            testHarness.processAll();
            Assertions.assertThat(testHarness.getOutput()).containsExactlyElementsOf(expectedOutput);
            testHarness.processElement(WatermarkStatus.ACTIVE, 0, 1);
            expectedOutput.add(WatermarkStatus.ACTIVE);
            Assertions.assertThat(testHarness.getOutput()).containsExactlyElementsOf(expectedOutput);
        }
    }

    @TestTemplate
    void testAdvanceToEndOfEventTime() throws Exception {
        try (StreamTaskMailboxTestHarness<String> testHarness = this.buildWatermarkTestHarness(2, false);){
            testHarness.processElement(Watermark.MAX_WATERMARK, 0, 0);
            testHarness.processElement(Watermark.MAX_WATERMARK, 0, 1);
            testHarness.getStreamTask().advanceToEndOfEventTime();
            testHarness.processElement(Watermark.MAX_WATERMARK, 1, 0);
            Assertions.assertThat(testHarness.getOutput()).doesNotContain(new Object[]{Watermark.MAX_WATERMARK});
            testHarness.processElement(Watermark.MAX_WATERMARK, 1, 1);
            Assertions.assertThat(testHarness.getOutput()).containsExactly(new Object[]{Watermark.MAX_WATERMARK});
        }
    }

    @TestTemplate
    void testWatermarkMetrics() throws Exception {
        final OperatorID mainOperatorId = new OperatorID();
        final OperatorID chainedOperatorId = new OperatorID();
        final InterceptingOperatorMetricGroup mainOperatorMetricGroup = new InterceptingOperatorMetricGroup();
        final InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup();
        InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup(){

            public InternalOperatorMetricGroup getOrAddOperator(OperatorID id, String name) {
                if (id.equals((Object)mainOperatorId)) {
                    return mainOperatorMetricGroup;
                }
                if (id.equals((Object)chainedOperatorId)) {
                    return chainedOperatorMetricGroup;
                }
                return super.getOrAddOperator(id, name);
            }
        };
        try (StreamTaskMailboxTestHarness<String> testHarness = ((StreamTaskMailboxTestHarnessBuilder)new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).modifyExecutionConfig(MultipleInputStreamTaskTest.applyObjectReuse(this.objectReuse)).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addSourceInput(new SourceOperatorFactory((Source)new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 2, true, false), WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new RecordToWatermarkGenerator())), BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.DOUBLE_TYPE_INFO).setupOperatorChain(mainOperatorId, (StreamOperatorFactory<?>)new MapToStringMultipleInputOperatorFactory(3)).chain(chainedOperatorId, (OneInputStreamOperator)new OneInputStreamTaskTest.WatermarkMetricOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl())).finish()).setTaskMetricGroup((TaskMetricGroup)taskMetricGroup).build();){
            Gauge taskInputWatermarkGauge = (Gauge)taskMetricGroup.get("currentInputWatermark");
            Gauge mainInput1WatermarkGauge = (Gauge)mainOperatorMetricGroup.get(MetricNames.currentInputWatermarkName((int)1));
            Gauge mainInput2WatermarkGauge = (Gauge)mainOperatorMetricGroup.get(MetricNames.currentInputWatermarkName((int)2));
            Gauge mainInput3WatermarkGauge = (Gauge)mainOperatorMetricGroup.get(MetricNames.currentInputWatermarkName((int)3));
            Gauge mainInputWatermarkGauge = (Gauge)mainOperatorMetricGroup.get("currentInputWatermark");
            Gauge mainOutputWatermarkGauge = (Gauge)mainOperatorMetricGroup.get("currentOutputWatermark");
            Gauge chainedInputWatermarkGauge = (Gauge)chainedOperatorMetricGroup.get("currentInputWatermark");
            Gauge chainedOutputWatermarkGauge = (Gauge)chainedOperatorMetricGroup.get("currentOutputWatermark");
            Assertions.assertThat((Long)((Long)taskInputWatermarkGauge.getValue())).isEqualTo(Long.MIN_VALUE);
            Assertions.assertThat((Long)((Long)taskInputWatermarkGauge.getValue())).isEqualTo(Long.MIN_VALUE);
            Assertions.assertThat((Long)((Long)mainInputWatermarkGauge.getValue())).isEqualTo(Long.MIN_VALUE);
            Assertions.assertThat((Long)((Long)mainInput1WatermarkGauge.getValue())).isEqualTo(Long.MIN_VALUE);
            Assertions.assertThat((Long)((Long)mainInput2WatermarkGauge.getValue())).isEqualTo(Long.MIN_VALUE);
            Assertions.assertThat((Long)((Long)mainInput3WatermarkGauge.getValue())).isEqualTo(Long.MIN_VALUE);
            Assertions.assertThat((Long)((Long)mainOutputWatermarkGauge.getValue())).isEqualTo(Long.MIN_VALUE);
            Assertions.assertThat((Long)((Long)chainedInputWatermarkGauge.getValue())).isEqualTo(Long.MIN_VALUE);
            Assertions.assertThat((Long)((Long)chainedOutputWatermarkGauge.getValue())).isEqualTo(Long.MIN_VALUE);
            testHarness.processElement(new Watermark(1L), 0);
            Assertions.assertThat((Long)((Long)taskInputWatermarkGauge.getValue())).isEqualTo(Long.MIN_VALUE);
            Assertions.assertThat((Long)((Long)mainInputWatermarkGauge.getValue())).isEqualTo(Long.MIN_VALUE);
            Assertions.assertThat((Long)((Long)mainInput1WatermarkGauge.getValue())).isOne();
            Assertions.assertThat((Long)((Long)mainInput2WatermarkGauge.getValue())).isEqualTo(Long.MIN_VALUE);
            Assertions.assertThat((Long)((Long)mainInput3WatermarkGauge.getValue())).isEqualTo(Long.MIN_VALUE);
            Assertions.assertThat((Long)((Long)mainOutputWatermarkGauge.getValue())).isEqualTo(Long.MIN_VALUE);
            Assertions.assertThat((Long)((Long)chainedInputWatermarkGauge.getValue())).isEqualTo(Long.MIN_VALUE);
            Assertions.assertThat((Long)((Long)chainedOutputWatermarkGauge.getValue())).isEqualTo(Long.MIN_VALUE);
            MultipleInputStreamTaskTest.addSourceRecords(testHarness, 1, 2);
            testHarness.processAll();
            Assertions.assertThat((Long)((Long)taskInputWatermarkGauge.getValue())).isEqualTo(Long.MIN_VALUE);
            Assertions.assertThat((Long)((Long)mainInputWatermarkGauge.getValue())).isEqualTo(Long.MIN_VALUE);
            Assertions.assertThat((Long)((Long)mainInput1WatermarkGauge.getValue())).isOne();
            Assertions.assertThat((Long)((Long)mainInput2WatermarkGauge.getValue())).isEqualTo(2L);
            Assertions.assertThat((Long)((Long)mainInput3WatermarkGauge.getValue())).isEqualTo(Long.MIN_VALUE);
            Assertions.assertThat((Long)((Long)mainOutputWatermarkGauge.getValue())).isEqualTo(Long.MIN_VALUE);
            Assertions.assertThat((Long)((Long)chainedInputWatermarkGauge.getValue())).isEqualTo(Long.MIN_VALUE);
            Assertions.assertThat((Long)((Long)chainedOutputWatermarkGauge.getValue())).isEqualTo(Long.MIN_VALUE);
            testHarness.processElement(new Watermark(2L), 1);
            Assertions.assertThat((Long)((Long)taskInputWatermarkGauge.getValue())).isOne();
            Assertions.assertThat((Long)((Long)mainInputWatermarkGauge.getValue())).isOne();
            Assertions.assertThat((Long)((Long)mainInput1WatermarkGauge.getValue())).isOne();
            Assertions.assertThat((Long)((Long)mainInput2WatermarkGauge.getValue())).isEqualTo(2L);
            Assertions.assertThat((Long)((Long)mainInput3WatermarkGauge.getValue())).isEqualTo(2L);
            Assertions.assertThat((Long)((Long)mainOutputWatermarkGauge.getValue())).isOne();
            Assertions.assertThat((Long)((Long)chainedInputWatermarkGauge.getValue())).isOne();
            Assertions.assertThat((Long)((Long)chainedOutputWatermarkGauge.getValue())).isEqualTo(2L);
            testHarness.processElement(new Watermark(4L), 0);
            MultipleInputStreamTaskTest.addSourceRecords(testHarness, 1, 3);
            testHarness.processAll();
            Assertions.assertThat((Long)((Long)taskInputWatermarkGauge.getValue())).isEqualTo(2L);
            Assertions.assertThat((Long)((Long)mainInputWatermarkGauge.getValue())).isEqualTo(2L);
            Assertions.assertThat((Long)((Long)mainInput1WatermarkGauge.getValue())).isEqualTo(4L);
            Assertions.assertThat((Long)((Long)mainInput2WatermarkGauge.getValue())).isEqualTo(3L);
            Assertions.assertThat((Long)((Long)mainInput3WatermarkGauge.getValue())).isEqualTo(2L);
            Assertions.assertThat((Long)((Long)mainOutputWatermarkGauge.getValue())).isEqualTo(2L);
            Assertions.assertThat((Long)((Long)chainedInputWatermarkGauge.getValue())).isEqualTo(2L);
            Assertions.assertThat((Long)((Long)chainedOutputWatermarkGauge.getValue())).isEqualTo(4L);
            this.finishAddingRecords(testHarness, 1);
            testHarness.endInput();
            testHarness.waitForTaskCompletion();
            testHarness.finishProcessing();
        }
    }

    @TestTemplate
    void testCheckpointBarrierMetrics() throws Exception {
        ConcurrentHashMap<String, Metric> metrics = new ConcurrentHashMap<String, Metric>();
        TaskMetricGroup taskMetricGroup = StreamTaskTestHarness.createTaskMetricGroup(metrics);
        try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO, 2).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO, 2).addInput((TypeInformation<?>)BasicTypeInfo.DOUBLE_TYPE_INFO, 2).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)new MapToStringMultipleInputOperatorFactory(3)).setTaskMetricGroup(taskMetricGroup).build();){
            ((MapAssert)Assertions.assertThat(metrics).containsKey((Object)"checkpointAlignmentTime")).containsKey((Object)"checkpointStartDelayNanos");
            testHarness.endInput();
            testHarness.waitForTaskCompletion();
        }
    }

    @TestTemplate
    void testCanEmitBatchOfRecords() throws Exception {
        AvailabilityProvider.AvailabilityHelper availabilityHelper = new AvailabilityProvider.AvailabilityHelper();
        try (StreamTaskMailboxTestHarness testHarness = ((StreamTaskMailboxTestHarnessBuilder)new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.DOUBLE_TYPE_INFO).addAdditionalOutput(new ResultPartitionWriter[]{new ResultPartitionWriterWithAvailabilityHelper(availabilityHelper)}).setupOperatorChain((StreamOperatorFactory<?>)new MapToStringMultipleInputOperatorFactory(3)).finishForSingletonOperatorChain((TypeSerializer)IntSerializer.INSTANCE)).build();){
            StreamTask.CanEmitBatchOfRecordsChecker canEmitBatchOfRecordsChecker = testHarness.streamTask.getCanEmitBatchOfRecords();
            testHarness.processAll();
            availabilityHelper.resetAvailable();
            Assertions.assertThat((boolean)canEmitBatchOfRecordsChecker.check()).isFalse();
            availabilityHelper.resetUnavailable();
            Assertions.assertThat((boolean)canEmitBatchOfRecordsChecker.check()).isFalse();
            availabilityHelper.resetAvailable();
            Assertions.assertThat((boolean)canEmitBatchOfRecordsChecker.check()).isFalse();
            testHarness.streamTask.mainMailboxExecutor.execute(() -> {}, "mail");
            Assertions.assertThat((boolean)canEmitBatchOfRecordsChecker.check()).isFalse();
            testHarness.processAll();
            Assertions.assertThat((boolean)canEmitBatchOfRecordsChecker.check()).isFalse();
        }
    }

    @TestTemplate
    void testLatencyMarker() throws Exception {
        ConcurrentHashMap<String, Metric> metrics = new ConcurrentHashMap<String, Metric>();
        TaskMetricGroup taskMetricGroup = StreamTaskTestHarness.createTaskMetricGroup(metrics);
        try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.DOUBLE_TYPE_INFO).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)new MapToStringMultipleInputOperatorFactory(3)).setTaskMetricGroup(taskMetricGroup).build();){
            ArrayDeque<LatencyMarker> expectedOutput = new ArrayDeque<LatencyMarker>();
            OperatorID sourceId = new OperatorID();
            LatencyMarker latencyMarker = new LatencyMarker(42L, sourceId, 0);
            testHarness.processElement(latencyMarker);
            expectedOutput.add(latencyMarker);
            Assertions.assertThat(testHarness.getOutput()).containsExactlyElementsOf(expectedOutput);
            testHarness.endInput();
            testHarness.waitForTaskCompletion();
        }
    }

    @TestTemplate
    void testTriggeringAlignedNoTimeoutCheckpointWithFinishedChannels() throws Exception {
        this.testTriggeringCheckpointWithFinishedChannels(CheckpointOptions.alignedNoTimeout((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault()));
    }

    @TestTemplate
    void testTriggeringUnalignedCheckpointWithFinishedChannels() throws Exception {
        this.testTriggeringCheckpointWithFinishedChannels(CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault()));
    }

    @TestTemplate
    void testTriggeringAlignedWithTimeoutCheckpointWithFinishedChannels() throws Exception {
        this.testTriggeringCheckpointWithFinishedChannels(CheckpointOptions.alignedWithTimeout((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault(), (long)10L));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testTriggeringCheckpointWithFinishedChannels(CheckpointOptions checkpointOptions) throws Exception {
        ResultPartition[] partitionWriters = new ResultPartition[2];
        try {
            for (int i = 0; i < partitionWriters.length; ++i) {
                partitionWriters[i] = PartitionTestUtils.createPartition((ResultPartitionType)ResultPartitionType.PIPELINED_BOUNDED);
                partitionWriters[i].setup();
            }
            CompletingCheckpointResponder checkpointResponder = new CompletingCheckpointResponder();
            try (StreamTaskMailboxTestHarness<String> testHarness = ((StreamTaskMailboxTestHarnessBuilder)new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.DOUBLE_TYPE_INFO).addAdditionalOutput((ResultPartitionWriter[])partitionWriters).setCheckpointResponder(checkpointResponder).modifyStreamConfig(config -> {
                config.setCheckpointingEnabled(true);
                config.setUnalignedCheckpointsEnabled(checkpointOptions.isUnalignedCheckpoint() || checkpointOptions.isTimeoutable());
            }).setupOperatorChain((StreamOperatorFactory<?>)new MapToStringMultipleInputOperatorFactory(3)).finishForSingletonOperatorChain((TypeSerializer)StringSerializer.INSTANCE)).build();){
                checkpointResponder.setHandlers(arg_0 -> testHarness.streamTask.notifyCheckpointCompleteAsync(arg_0), (arg_0, arg_1) -> testHarness.streamTask.notifyCheckpointAbortAsync(arg_0, arg_1));
                testHarness.getStreamTask().getCheckpointBarrierHandler().get();
                CompletableFuture<Boolean> checkpointFuture = StreamTaskFinalCheckpointsTest.triggerCheckpoint(testHarness, 2L, checkpointOptions);
                StreamTaskFinalCheckpointsTest.processMailTillCheckpointSucceeds(testHarness, checkpointFuture);
                Assertions.assertThat((long)testHarness.getTaskStateManager().getReportedCheckpointId()).isEqualTo(2L);
                testHarness.processEvent((AbstractEvent)new EndOfData(StopMode.DRAIN), 0, 0);
                testHarness.processEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE, 0, 0);
                checkpointFuture = StreamTaskFinalCheckpointsTest.triggerCheckpoint(testHarness, 4L, checkpointOptions);
                StreamTaskFinalCheckpointsTest.processMailTillCheckpointSucceeds(testHarness, checkpointFuture);
                Assertions.assertThat((long)testHarness.getTaskStateManager().getReportedCheckpointId()).isEqualTo(4L);
                testHarness.processEvent((AbstractEvent)new EndOfData(StopMode.DRAIN), 1, 0);
                testHarness.processEvent((AbstractEvent)new EndOfData(StopMode.DRAIN), 2, 0);
                testHarness.processEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE, 1, 0);
                testHarness.processEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE, 2, 0);
                checkpointFuture = StreamTaskFinalCheckpointsTest.triggerCheckpoint(testHarness, 6L, checkpointOptions);
                checkpointFuture.thenAccept(ignored -> {
                    for (ResultPartition resultPartition : partitionWriters) {
                        resultPartition.onSubpartitionAllDataProcessed(0);
                    }
                });
                testHarness.processAll();
                testHarness.finishProcessing();
                Assertions.assertThat(checkpointFuture).isDone();
                testHarness.getTaskStateManager().getWaitForReportLatch().await();
                Assertions.assertThat((long)testHarness.getTaskStateManager().getReportedCheckpointId()).isEqualTo(6L);
                for (ResultPartition resultPartition : partitionWriters) {
                    Assertions.assertThat((int)resultPartition.getNumberOfQueuedBuffers()).isEqualTo(4);
                }
            }
        }
        finally {
            for (ResultPartition writer : partitionWriters) {
                if (writer == null) continue;
                writer.close();
            }
        }
    }

    @TestTemplate
    void testSkipExecutionsIfFinishedOnRestore() throws Exception {
        OperatorID nonSourceOperatorId = new OperatorID();
        try (StreamTaskMailboxTestHarness testHarness = ((StreamTaskMailboxTestHarnessBuilder)new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).setCollectNetworkEvents().modifyStreamConfig(config -> config.setCheckpointingEnabled(true)).modifyExecutionConfig(MultipleInputStreamTaskTest.applyObjectReuse(this.objectReuse)).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO).setTaskStateSnapshot(1L, TaskStateSnapshot.FINISHED_ON_RESTORE).setupOperatorChain(nonSourceOperatorId, (StreamOperatorFactory<?>)new MultipleInputStreamTaskChainedSourcesCheckpointingTest.LifeCycleMonitorMultipleInputOperatorFactory()).chain((OneInputStreamOperator)new TestFinishedOnRestoreStreamOperator(), (TypeSerializer)StringSerializer.INSTANCE).finish()).build();){
            testHarness.processElement(Watermark.MAX_WATERMARK, 0);
            testHarness.processElement(Watermark.MAX_WATERMARK, 1);
            testHarness.processElement(Watermark.MAX_WATERMARK, 2);
            testHarness.waitForTaskCompletion();
            Assertions.assertThat(testHarness.getOutput()).containsExactly(new Object[]{Watermark.MAX_WATERMARK, new EndOfData(StopMode.DRAIN)});
        }
    }

    @TestTemplate
    void testTriggeringStopWithSavepointWithDrain() throws Exception {
        SourceOperatorFactory sourceOperatorFactory = new SourceOperatorFactory((Source)new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 2), WatermarkStrategy.noWatermarks());
        final CompletableFuture checkpointCompleted = new CompletableFuture();
        TestCheckpointResponder checkpointResponder = new TestCheckpointResponder(){

            public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot subtaskState) {
                super.acknowledgeCheckpoint(jobID, executionAttemptID, checkpointId, checkpointMetrics, subtaskState);
                checkpointCompleted.complete(null);
            }
        };
        try (StreamTaskMailboxTestHarness testHarness = ((StreamTaskMailboxTestHarnessBuilder)new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).setCollectNetworkEvents().modifyStreamConfig(config -> config.setCheckpointingEnabled(true)).modifyExecutionConfig(MultipleInputStreamTaskTest.applyObjectReuse(this.objectReuse)).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO).setTaskStateSnapshot(1L, TaskStateSnapshot.FINISHED_ON_RESTORE).setupOperatorChain((StreamOperatorFactory<?>)new MultipleInputStreamTaskChainedSourcesCheckpointingTest.LifeCycleMonitorMultipleInputOperatorFactory()).finishForSingletonOperatorChain((TypeSerializer)StringSerializer.INSTANCE)).setCheckpointResponder((CheckpointResponder)checkpointResponder).build();){
            CompletableFuture triggerResult = testHarness.streamTask.triggerCheckpointAsync(new CheckpointMetaData(2L, 2L), CheckpointOptions.alignedNoTimeout((SnapshotType)SavepointType.terminate((SavepointFormatType)SavepointFormatType.CANONICAL), (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault()));
            checkpointCompleted.whenComplete((ignored, exception) -> testHarness.streamTask.notifyCheckpointCompleteAsync(2L));
            testHarness.waitForTaskCompletion();
            testHarness.finishProcessing();
            Assertions.assertThat((CompletableFuture)triggerResult).isCompletedWithValue((Object)true);
            Assertions.assertThat(checkpointCompleted).isDone();
        }
    }

    static Consumer<ExecutionConfig> applyObjectReuse(boolean objectReuse) {
        return config -> {
            if (objectReuse) {
                config.enableObjectReuse();
            } else {
                config.disableObjectReuse();
            }
        };
    }

    static StreamTaskMailboxTestHarness<String> buildTestHarness(boolean objectReuse) throws Exception {
        return MultipleInputStreamTaskTest.buildTestHarness(false, objectReuse);
    }

    static StreamTaskMailboxTestHarness<String> buildTestHarness(boolean unaligned, boolean objectReuse) throws Exception {
        return new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).modifyExecutionConfig(MultipleInputStreamTaskTest.applyObjectReuse(objectReuse)).modifyStreamConfig(config -> config.setUnalignedCheckpointsEnabled(unaligned)).modifyStreamConfig(config -> config.setAlignedCheckpointTimeout(Duration.ZERO)).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addSourceInput(new SourceOperatorFactory((Source)new MockSource(Boundedness.BOUNDED, 1), WatermarkStrategy.noWatermarks()), BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.DOUBLE_TYPE_INFO).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)new MapToStringMultipleInputOperatorFactory(3)).build();
    }

    static void addSourceRecords(StreamTaskMailboxTestHarness<String> testHarness, int sourceId, int ... records) throws Exception {
        MultipleInputStreamTaskTest.addSourceRecords(testHarness, sourceId, Boundedness.BOUNDED, records);
    }

    static void addSourceRecords(StreamTaskMailboxTestHarness<String> testHarness, int sourceId, Boundedness boundedness, int ... records) throws Exception {
        OperatorID sourceOperatorID = MultipleInputStreamTaskTest.getSourceOperatorID(testHarness, sourceId);
        MockSourceSplit split = new MockSourceSplit(0, 0, boundedness == Boundedness.BOUNDED ? records.length : Integer.MAX_VALUE);
        for (int record : records) {
            split.addRecord(record);
        }
        AddSplitEvent addSplitEvent = new AddSplitEvent(Collections.singletonList(split), (SimpleVersionedSerializer)new MockSourceSplitSerializer());
        testHarness.getStreamTask().dispatchOperatorEvent(sourceOperatorID, new SerializedValue((Object)addSplitEvent));
    }

    private StreamTaskMailboxTestHarness<String> buildWatermarkTestHarness(int inputChannels, boolean readerMarkIdleOnNoSplits) throws Exception {
        return new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).modifyExecutionConfig(MultipleInputStreamTaskTest.applyObjectReuse(this.objectReuse)).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO, inputChannels).addSourceInput(new SourceOperatorFactory((Source)new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 2, true, readerMarkIdleOnNoSplits), WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new RecordToWatermarkGenerator())), BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.DOUBLE_TYPE_INFO, inputChannels).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)new MapToStringMultipleInputOperatorFactory(3)).build();
    }

    private static OperatorID getSourceOperatorID(StreamTaskMailboxTestHarness<String> testHarness, int sourceId) {
        StreamConfig.InputConfig[] inputs = testHarness.getStreamTask().getConfiguration().getInputs(testHarness.getClass().getClassLoader());
        StreamConfig.SourceInputConfig input = (StreamConfig.SourceInputConfig)inputs[sourceId];
        return testHarness.getStreamTask().operatorChain.getSourceTaskInput(input).getOperatorID();
    }

    private void finishAddingRecords(StreamTaskMailboxTestHarness<String> testHarness, int sourceId) throws Exception {
        testHarness.getStreamTask().dispatchOperatorEvent(MultipleInputStreamTaskTest.getSourceOperatorID(testHarness, sourceId), new SerializedValue((Object)new NoMoreSplitsEvent()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testTaskSideOutputStatistics() throws Exception {
        TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
        ResultPartitionWriter[] partitionWriters = new ResultPartitionWriter[3];
        for (int i = 0; i < partitionWriters.length; ++i) {
            partitionWriters[i] = new RecordOrEventCollectingResultPartitionWriter(new ArrayDeque(), (TypeSerializer)new StreamElementSerializer(BasicTypeInfo.INT_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl())));
            partitionWriters[i].setup();
        }
        try (StreamTaskMailboxTestHarness testHarness = ((StreamTaskMailboxTestHarnessBuilder)new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).modifyExecutionConfig(MultipleInputStreamTaskTest.applyObjectReuse(this.objectReuse)).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO).addAdditionalOutput(partitionWriters).setupOperatorChain((StreamOperatorFactory<?>)new PassThroughOperatorFactory()).chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl())).setOperatorFactory((StreamOperatorFactory)SimpleOperatorFactory.of((StreamOperator)new OneInputStreamTaskTest.OddEvenOperator())).addNonChainedOutputsCount(new OutputTag("odd", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO), 2).addNonChainedOutputsCount(1).build().chain(BasicTypeInfo.INT_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl())).setOperatorFactory((StreamOperatorFactory)SimpleOperatorFactory.of((StreamOperator)new OneInputStreamTaskTest.DuplicatingOperator())).addNonChainedOutputsCount(1).build().finish()).setTaskMetricGroup(taskMetricGroup).build();){
            int x;
            Counter numRecordsInCounter = taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
            Counter numRecordsOutCounter = taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
            int numOddRecords = 5;
            int numEvenRecords = 3;
            int numNaturalRecords = 2;
            for (x = 0; x < numOddRecords; ++x) {
                testHarness.processElement(new StreamRecord((Object)(x * 2 + 1)));
            }
            for (x = 0; x < numEvenRecords; ++x) {
                testHarness.processElement(new StreamRecord((Object)(x * 2)));
            }
            for (x = 0; x < numNaturalRecords; ++x) {
                testHarness.processElement(new StreamRecord((Object)x));
            }
            int totalOddRecords = numOddRecords + numNaturalRecords / 2;
            int totalEvenRecords = numEvenRecords + (int)Math.ceil((double)numNaturalRecords / 2.0);
            int oddEvenOperatorOutputsWithOddTag = totalOddRecords;
            int oddEvenOperatorOutputsWithoutTag = totalOddRecords + totalEvenRecords;
            int duplicatingOperatorOutput = (totalOddRecords + totalEvenRecords) * 2;
            Assertions.assertThat((long)numRecordsInCounter.getCount()).isEqualTo((long)(totalOddRecords + totalEvenRecords));
            Assertions.assertThat((long)numRecordsOutCounter.getCount()).isEqualTo((long)(oddEvenOperatorOutputsWithOddTag + oddEvenOperatorOutputsWithoutTag + duplicatingOperatorOutput));
            testHarness.waitForTaskCompletion();
        }
        finally {
            for (ResultPartitionWriter partitionWriter : partitionWriters) {
                partitionWriter.close();
            }
        }
    }

    private static class CopyProxySerializer
    extends TypeSerializer<Integer> {
        SharedReference<List<Integer>> copiedElementsRef;

        public CopyProxySerializer(SharedReference<List<Integer>> copiedElementsRef) {
            this.copiedElementsRef = copiedElementsRef;
        }

        public boolean isImmutableType() {
            return false;
        }

        public TypeSerializer<Integer> duplicate() {
            return new CopyProxySerializer(this.copiedElementsRef);
        }

        public Integer createInstance() {
            return IntSerializer.INSTANCE.createInstance();
        }

        public Integer copy(Integer from) {
            this.copiedElementsRef.applySync(list -> list.add(from));
            return IntSerializer.INSTANCE.copy(from);
        }

        public Integer copy(Integer from, Integer reuse) {
            this.copiedElementsRef.applySync(list -> list.add(from));
            return IntSerializer.INSTANCE.copy(from, reuse);
        }

        public int getLength() {
            return IntSerializer.INSTANCE.getLength();
        }

        public void serialize(Integer record, DataOutputView target) throws IOException {
            IntSerializer.INSTANCE.serialize(record, target);
        }

        public Integer deserialize(DataInputView source) throws IOException {
            return IntSerializer.INSTANCE.deserialize(source);
        }

        public Integer deserialize(Integer reuse, DataInputView source) throws IOException {
            return IntSerializer.INSTANCE.deserialize(reuse, source);
        }

        public void copy(DataInputView source, DataOutputView target) throws IOException {
            throw new UnsupportedOperationException();
        }

        public boolean equals(Object obj) {
            throw new UnsupportedOperationException();
        }

        public int hashCode() {
            throw new UnsupportedOperationException();
        }

        public TypeSerializerSnapshot<Integer> snapshotConfiguration() {
            throw new UnsupportedOperationException();
        }
    }

    private static class RecordToWatermarkGenerator
    implements WatermarkGenerator<Integer>,
    Serializable {
        private RecordToWatermarkGenerator() {
        }

        public void onEvent(Integer event, long eventTimestamp, WatermarkOutput output) {
            output.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark((long)event.intValue()));
        }

        public void onPeriodicEmit(WatermarkOutput output) {
        }
    }

    static class LifeCycleTrackingMap<T>
    extends AbstractStreamOperator<T>
    implements OneInputStreamOperator<T, T>,
    BoundedOneInput {
        public static final String OPEN = "LifeCycleTrackingMap#open";
        public static final String CLOSE = "LifeCycleTrackingMap#close";
        public static final String END_INPUT = "LifeCycleTrackingMap#endInput";

        LifeCycleTrackingMap() {
        }

        public void processElement(StreamRecord<T> element) throws Exception {
            this.output.collect(element);
        }

        public void open() throws Exception {
            LIFE_CYCLE_EVENTS.add(OPEN);
            super.open();
        }

        public void close() throws Exception {
            LIFE_CYCLE_EVENTS.add(CLOSE);
            super.close();
        }

        public void endInput() throws Exception {
            LIFE_CYCLE_EVENTS.add(END_INPUT);
        }
    }

    static class LifeCycleTrackingMockSourceReader
    extends MockSourceReader {
        public static final String START = "SourceReader#start";
        public static final String CLOSE = "SourceReader#close";

        LifeCycleTrackingMockSourceReader() {
        }

        public void start() {
            LIFE_CYCLE_EVENTS.add(START);
            super.start();
        }

        public void close() throws Exception {
            LIFE_CYCLE_EVENTS.add(CLOSE);
            super.close();
        }
    }

    static class LifeCycleTrackingMockSource
    extends MockSource {
        public LifeCycleTrackingMockSource(Boundedness boundedness, int numSplits) {
            super(boundedness, numSplits);
        }

        public SourceReader<Integer, MockSourceSplit> createReader(SourceReaderContext readerContext) {
            LifeCycleTrackingMockSourceReader sourceReader = new LifeCycleTrackingMockSourceReader();
            this.createdReaders.add(sourceReader);
            return sourceReader;
        }
    }

    static class LifeCycleTrackingMapToStringMultipleInputOperatorFactory
    extends AbstractStreamOperatorFactory<String> {
        LifeCycleTrackingMapToStringMultipleInputOperatorFactory() {
        }

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> parameters) {
            return (T)((Object)new LifeCycleTrackingMapToStringMultipleInputOperator(parameters));
        }

        public Class<? extends StreamOperator<String>> getStreamOperatorClass(ClassLoader classLoader) {
            return LifeCycleTrackingMapToStringMultipleInputOperator.class;
        }
    }

    static class LifeCycleTrackingMapToStringMultipleInputOperator
    extends MapToStringMultipleInputOperator
    implements BoundedMultiInput {
        public static final String OPEN = "MultipleInputOperator#open";
        public static final String CLOSE = "MultipleInputOperator#close";
        public static final String FINISH = "MultipleInputOperator#finish";
        public static final String END_INPUT = "MultipleInputOperator#endInput";
        private static final long serialVersionUID = 1L;

        public LifeCycleTrackingMapToStringMultipleInputOperator(StreamOperatorParameters<String> parameters) {
            super(parameters, 3);
        }

        @Override
        public void open() throws Exception {
            LIFE_CYCLE_EVENTS.add(OPEN);
            super.open();
        }

        @Override
        public void close() throws Exception {
            LIFE_CYCLE_EVENTS.add(CLOSE);
            super.close();
        }

        public void endInput(int inputId) {
            LIFE_CYCLE_EVENTS.add(END_INPUT);
        }

        @Override
        public void finish() throws Exception {
            LIFE_CYCLE_EVENTS.add(FINISH);
        }
    }

    private static class OddEvenOperatorFactory
    extends AbstractStreamOperatorFactory<Integer> {
        private OddEvenOperatorFactory() {
        }

        public <T extends StreamOperator<Integer>> T createStreamOperator(StreamOperatorParameters<Integer> parameters) {
            return (T)((Object)new OddEvenOperator(parameters));
        }

        public Class<? extends StreamOperator<Integer>> getStreamOperatorClass(ClassLoader classLoader) {
            return OddEvenOperator.class;
        }
    }

    static class OddEvenOperator
    extends AbstractStreamOperatorV2<Integer>
    implements MultipleInputStreamOperator<Integer> {
        public OddEvenOperator(StreamOperatorParameters<Integer> parameters) {
            super(parameters, 3);
        }

        public List<Input> getInputs() {
            return Arrays.asList(new Input[]{new OddEvenInput(this, 1), new OddEvenInput(this, 2), new OddEvenInput(this, 3)});
        }

        static class OddEvenInput
        extends AbstractInput<Integer, Integer> {
            private final OutputTag<Integer> oddOutputTag = new OutputTag("odd", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);
            private final OutputTag<Integer> evenOutputTag = new OutputTag("even", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);

            public OddEvenInput(AbstractStreamOperatorV2<Integer> owner, int inputId) {
                super(owner, inputId);
            }

            public void processElement(StreamRecord<Integer> element) throws Exception {
                if ((Integer)element.getValue() % 2 == 0) {
                    this.output.collect(this.evenOutputTag, element);
                } else {
                    this.output.collect(this.oddOutputTag, element);
                }
                this.output.collect(element);
            }
        }
    }

    private static class PassThroughOperatorFactory<T>
    extends AbstractStreamOperatorFactory<T> {
        private PassThroughOperatorFactory() {
        }

        public <O extends StreamOperator<T>> O createStreamOperator(StreamOperatorParameters<T> parameters) {
            return (O)((Object)new PassThroughOperator<T>(parameters));
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return PassThroughOperator.class;
        }
    }

    static class PassThroughOperator<T>
    extends AbstractStreamOperatorV2<T>
    implements MultipleInputStreamOperator<T> {
        public PassThroughOperator(StreamOperatorParameters<T> parameters) {
            super(parameters, 3);
        }

        public List<Input> getInputs() {
            return Arrays.asList(new Input[]{new PassThroughInput(this, 1), new PassThroughInput(this, 2), new PassThroughInput(this, 3)});
        }

        static class PassThroughInput<I>
        extends AbstractInput<I, I> {
            public PassThroughInput(AbstractStreamOperatorV2<I> owner, int inputId) {
                super(owner, inputId);
            }

            public void processElement(StreamRecord<I> element) throws Exception {
                this.output.collect(element);
            }
        }
    }

    protected static class MapToStringMultipleInputOperatorFactory
    extends AbstractStreamOperatorFactory<String> {
        private final int numberOfInputs;
        private final boolean emitOnFinish;

        public MapToStringMultipleInputOperatorFactory(int numberOfInputs) {
            this(numberOfInputs, false);
        }

        public MapToStringMultipleInputOperatorFactory(int numberOfInputs, boolean emitOnFinish) {
            this.numberOfInputs = numberOfInputs;
            this.emitOnFinish = emitOnFinish;
        }

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> parameters) {
            return (T)((Object)new MapToStringMultipleInputOperator(parameters, this.numberOfInputs, this.emitOnFinish));
        }

        public Class<? extends StreamOperator<String>> getStreamOperatorClass(ClassLoader classLoader) {
            return MapToStringMultipleInputOperator.class;
        }
    }

    protected static class MapToStringMultipleInputOperator
    extends AbstractStreamOperatorV2<String>
    implements MultipleInputStreamOperator<String> {
        private static final long serialVersionUID = 1L;
        private final int numberOfInputs;
        private final boolean emitOnFinish;
        private boolean openCalled;
        private boolean closeCalled;

        public MapToStringMultipleInputOperator(StreamOperatorParameters<String> parameters, int numberOfInputs) {
            this(parameters, numberOfInputs, false);
        }

        public MapToStringMultipleInputOperator(StreamOperatorParameters<String> parameters, int numberOfInputs, boolean emitOnFinish) {
            super(parameters, numberOfInputs);
            this.numberOfInputs = numberOfInputs;
            this.emitOnFinish = emitOnFinish;
        }

        public void open() throws Exception {
            super.open();
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)this.openCalled).as("Close called before open.", new Object[0])).isFalse();
            this.openCalled = true;
        }

        public void finish() throws Exception {
            if (this.emitOnFinish) {
                this.output.collect((Object)new StreamRecord((Object)"FINISH"));
            }
        }

        public void close() throws Exception {
            super.close();
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)this.openCalled).as("Open was not called before close.", new Object[0])).isTrue();
            this.closeCalled = true;
        }

        public List<Input> getInputs() {
            Preconditions.checkArgument((this.numberOfInputs <= 4 ? 1 : 0) != 0);
            return Arrays.asList(new Input[]{new MapToStringInput(this, 1), new MapToStringInput(this, 2), new MapToStringInput(this, 3), new MapToStringInput(this, 4)}).subList(0, this.numberOfInputs);
        }

        public boolean wasCloseCalled() {
            return this.closeCalled;
        }

        public class MapToStringInput<T>
        extends AbstractInput<T, String> {
            public MapToStringInput(AbstractStreamOperatorV2<String> owner, int inputId) {
                super(owner, inputId);
            }

            public void processElement(StreamRecord<T> element) throws Exception {
                ((AbstractBooleanAssert)Assertions.assertThat((boolean)MapToStringMultipleInputOperator.this.openCalled).as("Open was not called before run.", new Object[0])).isTrue();
                if (element.hasTimestamp()) {
                    this.output.collect((Object)new StreamRecord((Object)element.getValue().toString(), element.getTimestamp()));
                } else {
                    this.output.collect((Object)new StreamRecord((Object)element.getValue().toString()));
                }
            }
        }
    }

    static class DuplicatingOperator
    extends AbstractStreamOperatorV2<String>
    implements MultipleInputStreamOperator<String> {
        public DuplicatingOperator(StreamOperatorParameters<String> parameters) {
            super(parameters, 3);
        }

        public List<Input> getInputs() {
            return Arrays.asList(new Input[]{new DuplicatingInput(this, 1), new DuplicatingInput(this, 2), new DuplicatingInput(this, 3)});
        }

        class DuplicatingInput
        extends AbstractInput<String, String> {
            public DuplicatingInput(AbstractStreamOperatorV2<String> owner, int inputId) {
                super(owner, inputId);
            }

            public void processElement(StreamRecord<String> element) throws Exception {
                this.output.collect(element);
                this.output.collect(element);
            }
        }
    }
}

