/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.graph;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.mocks.MockSource;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.MultipleConnectedStreams;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.InputSelection;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionStateBackend;
import org.apache.flink.streaming.api.transformations.AbstractMultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation;
import org.apache.flink.streaming.api.transformations.FeedbackTransformation;
import org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

public class StreamGraphGeneratorBatchExecutionTest
extends TestLogger {
    @Rule
    public ExpectedException expectedException = ExpectedException.none();
    private static final KeyedProcessFunction<Integer, Integer, Integer> DUMMY_PROCESS_FUNCTION = new KeyedProcessFunction<Integer, Integer, Integer>(){

        public void processElement(Integer value, KeyedProcessFunction.Context ctx, Collector<Integer> out) {
        }
    };
    private static final KeyedCoProcessFunction<Integer, Integer, Integer, Integer> DUMMY_KEYED_CO_PROCESS_FUNCTION = new KeyedCoProcessFunction<Integer, Integer, Integer, Integer>(){

        public void processElement1(Integer value, KeyedCoProcessFunction.Context ctx, Collector<Integer> out) {
        }

        public void processElement2(Integer value, KeyedCoProcessFunction.Context ctx, Collector<Integer> out) {
        }
    };

    @Test
    public void testBatchJobType() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator process = env.fromElements((Object[])new Integer[]{1, 2}).keyBy(Integer::intValue).process(DUMMY_PROCESS_FUNCTION);
        DataStreamSink sink = process.addSink((SinkFunction)new DiscardingSink());
        StreamGraphGenerator graphGenerator = new StreamGraphGenerator(Collections.singletonList(sink.getTransformation()), env.getConfig(), env.getCheckpointConfig());
        graphGenerator.setRuntimeExecutionMode(RuntimeExecutionMode.BATCH);
        StreamGraph graph = graphGenerator.generate();
        Assert.assertThat((Object)graph.getJobType(), (Matcher)CoreMatchers.is((Object)JobType.BATCH));
    }

    @Test
    public void testOneInputTransformation() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator process = env.fromElements((Object[])new Integer[]{1, 2}).keyBy(Integer::intValue).process(DUMMY_PROCESS_FUNCTION);
        DataStreamSink sink = process.addSink((SinkFunction)new DiscardingSink());
        StreamGraphGenerator graphGenerator = new StreamGraphGenerator(Collections.singletonList(sink.getTransformation()), env.getConfig(), env.getCheckpointConfig());
        graphGenerator.setRuntimeExecutionMode(RuntimeExecutionMode.BATCH);
        StreamGraph graph = graphGenerator.generate();
        StreamNode processNode = graph.getStreamNode(Integer.valueOf(process.getId()));
        Assert.assertThat(processNode.getInputRequirements().get(0), (Matcher)CoreMatchers.equalTo((Object)StreamConfig.InputRequirement.SORTED));
        Assert.assertThat((Object)processNode.getOperatorFactory().getChainingStrategy(), (Matcher)CoreMatchers.equalTo((Object)ChainingStrategy.HEAD));
        Assert.assertThat((Object)graph.getStateBackend(), (Matcher)CoreMatchers.instanceOf(BatchExecutionStateBackend.class));
        Assert.assertThat((Object)graph.getTimerServiceProvider(), (Matcher)CoreMatchers.notNullValue());
    }

    @Test
    public void testDisablingStateBackendOneInputTransformation() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator process = env.fromElements((Object[])new Integer[]{1, 2}).keyBy(Integer::intValue).process(DUMMY_PROCESS_FUNCTION);
        DataStreamSink sink = process.addSink((SinkFunction)new DiscardingSink());
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.USE_BATCH_STATE_BACKEND, (Object)false);
        StreamGraphGenerator graphGenerator = new StreamGraphGenerator(Collections.singletonList(sink.getTransformation()), env.getConfig(), env.getCheckpointConfig(), (ReadableConfig)configuration);
        graphGenerator.setRuntimeExecutionMode(RuntimeExecutionMode.BATCH);
        StreamGraph graph = graphGenerator.generate();
        StreamNode processNode = graph.getStreamNode(Integer.valueOf(process.getId()));
        Assert.assertThat(processNode.getInputRequirements().get(0), (Matcher)CoreMatchers.equalTo((Object)StreamConfig.InputRequirement.SORTED));
        Assert.assertThat((Object)processNode.getOperatorFactory().getChainingStrategy(), (Matcher)CoreMatchers.equalTo((Object)ChainingStrategy.HEAD));
        Assert.assertThat((Object)graph.getStateBackend(), (Matcher)CoreMatchers.nullValue());
        Assert.assertThat((Object)graph.getTimerServiceProvider(), (Matcher)CoreMatchers.nullValue());
    }

    @Test
    public void testDisablingSortingInputsOneInputTransformation() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator process = env.fromElements((Object[])new Integer[]{1, 2}).keyBy(Integer::intValue).process(DUMMY_PROCESS_FUNCTION);
        DataStreamSink sink = process.addSink((SinkFunction)new DiscardingSink());
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.USE_BATCH_STATE_BACKEND, (Object)false);
        configuration.set(ExecutionOptions.SORT_INPUTS, (Object)false);
        StreamGraphGenerator graphGenerator = new StreamGraphGenerator(Collections.singletonList(sink.getTransformation()), env.getConfig(), env.getCheckpointConfig(), (ReadableConfig)configuration);
        graphGenerator.setRuntimeExecutionMode(RuntimeExecutionMode.BATCH);
        StreamGraph graph = graphGenerator.generate();
        StreamNode processNode = graph.getStreamNode(Integer.valueOf(process.getId()));
        Assert.assertThat(processNode.getInputRequirements().get(0), (Matcher)CoreMatchers.nullValue());
        Assert.assertThat((Object)graph.getStateBackend(), (Matcher)CoreMatchers.nullValue());
        Assert.assertThat((Object)graph.getTimerServiceProvider(), (Matcher)CoreMatchers.nullValue());
    }

    @Test
    public void testDisablingSortingInputsWithoutBatchStateBackendOneInputTransformation() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator process = env.fromElements((Object[])new Integer[]{1, 2}).keyBy(Integer::intValue).process(DUMMY_PROCESS_FUNCTION);
        DataStreamSink sink = process.addSink((SinkFunction)new DiscardingSink());
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.SORT_INPUTS, (Object)false);
        StreamGraphGenerator graphGenerator = new StreamGraphGenerator(Collections.singletonList(sink.getTransformation()), env.getConfig(), env.getCheckpointConfig(), (ReadableConfig)configuration);
        graphGenerator.setRuntimeExecutionMode(RuntimeExecutionMode.BATCH);
        this.expectedException.expect(IllegalStateException.class);
        this.expectedException.expectMessage("Batch state backend requires the sorted inputs to be enabled!");
        graphGenerator.generate();
    }

    @Test
    public void testTwoInputTransformation() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource elements1 = env.fromElements((Object[])new Integer[]{1, 2});
        DataStreamSource elements2 = env.fromElements((Object[])new Integer[]{1, 2});
        SingleOutputStreamOperator process = elements1.connect((DataStream)elements2).keyBy(Integer::intValue, Integer::intValue).process(DUMMY_KEYED_CO_PROCESS_FUNCTION);
        DataStreamSink sink = process.addSink((SinkFunction)new DiscardingSink());
        StreamGraphGenerator graphGenerator = new StreamGraphGenerator(Collections.singletonList(sink.getTransformation()), env.getConfig(), env.getCheckpointConfig());
        graphGenerator.setRuntimeExecutionMode(RuntimeExecutionMode.BATCH);
        StreamGraph graph = graphGenerator.generate();
        StreamNode processNode = graph.getStreamNode(Integer.valueOf(process.getId()));
        Assert.assertThat(processNode.getInputRequirements().get(0), (Matcher)CoreMatchers.equalTo((Object)StreamConfig.InputRequirement.SORTED));
        Assert.assertThat(processNode.getInputRequirements().get(1), (Matcher)CoreMatchers.equalTo((Object)StreamConfig.InputRequirement.SORTED));
        Assert.assertThat((Object)processNode.getOperatorFactory().getChainingStrategy(), (Matcher)CoreMatchers.equalTo((Object)ChainingStrategy.HEAD));
        Assert.assertThat((Object)graph.getStateBackend(), (Matcher)CoreMatchers.instanceOf(BatchExecutionStateBackend.class));
        Assert.assertThat((Object)graph.getTimerServiceProvider(), (Matcher)CoreMatchers.notNullValue());
    }

    @Test
    public void testDisablingStateBackendTwoInputTransformation() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource elements1 = env.fromElements((Object[])new Integer[]{1, 2});
        DataStreamSource elements2 = env.fromElements((Object[])new Integer[]{1, 2});
        SingleOutputStreamOperator process = elements1.connect((DataStream)elements2).keyBy(Integer::intValue, Integer::intValue).process(DUMMY_KEYED_CO_PROCESS_FUNCTION);
        DataStreamSink sink = process.addSink((SinkFunction)new DiscardingSink());
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.USE_BATCH_STATE_BACKEND, (Object)false);
        StreamGraphGenerator graphGenerator = new StreamGraphGenerator(Collections.singletonList(sink.getTransformation()), env.getConfig(), env.getCheckpointConfig(), (ReadableConfig)configuration);
        graphGenerator.setRuntimeExecutionMode(RuntimeExecutionMode.BATCH);
        StreamGraph graph = graphGenerator.generate();
        StreamNode processNode = graph.getStreamNode(Integer.valueOf(process.getId()));
        Assert.assertThat(processNode.getInputRequirements().get(0), (Matcher)CoreMatchers.equalTo((Object)StreamConfig.InputRequirement.SORTED));
        Assert.assertThat(processNode.getInputRequirements().get(1), (Matcher)CoreMatchers.equalTo((Object)StreamConfig.InputRequirement.SORTED));
        Assert.assertThat((Object)processNode.getOperatorFactory().getChainingStrategy(), (Matcher)CoreMatchers.equalTo((Object)ChainingStrategy.HEAD));
        Assert.assertThat((Object)graph.getStateBackend(), (Matcher)CoreMatchers.nullValue());
        Assert.assertThat((Object)graph.getTimerServiceProvider(), (Matcher)CoreMatchers.nullValue());
    }

    @Test
    public void testDisablingSortingInputsTwoInputTransformation() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource elements1 = env.fromElements((Object[])new Integer[]{1, 2});
        DataStreamSource elements2 = env.fromElements((Object[])new Integer[]{1, 2});
        SingleOutputStreamOperator process = elements1.connect((DataStream)elements2).keyBy(Integer::intValue, Integer::intValue).process(DUMMY_KEYED_CO_PROCESS_FUNCTION);
        DataStreamSink sink = process.addSink((SinkFunction)new DiscardingSink());
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.USE_BATCH_STATE_BACKEND, (Object)false);
        configuration.set(ExecutionOptions.SORT_INPUTS, (Object)false);
        StreamGraphGenerator graphGenerator = new StreamGraphGenerator(Collections.singletonList(sink.getTransformation()), env.getConfig(), env.getCheckpointConfig(), (ReadableConfig)configuration);
        graphGenerator.setRuntimeExecutionMode(RuntimeExecutionMode.BATCH);
        StreamGraph graph = graphGenerator.generate();
        StreamNode processNode = graph.getStreamNode(Integer.valueOf(process.getId()));
        Assert.assertThat(processNode.getInputRequirements().get(0), (Matcher)CoreMatchers.nullValue());
        Assert.assertThat(processNode.getInputRequirements().get(1), (Matcher)CoreMatchers.nullValue());
        Assert.assertThat((Object)graph.getStateBackend(), (Matcher)CoreMatchers.nullValue());
        Assert.assertThat((Object)graph.getTimerServiceProvider(), (Matcher)CoreMatchers.nullValue());
    }

    @Test
    public void testDisablingSortingInputsWithoutBatchStateBackendTwoInputTransformation() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource elements1 = env.fromElements((Object[])new Integer[]{1, 2});
        DataStreamSource elements2 = env.fromElements((Object[])new Integer[]{1, 2});
        SingleOutputStreamOperator process = elements1.connect((DataStream)elements2).keyBy(Integer::intValue, Integer::intValue).process(DUMMY_KEYED_CO_PROCESS_FUNCTION);
        DataStreamSink sink = process.addSink((SinkFunction)new DiscardingSink());
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.SORT_INPUTS, (Object)false);
        StreamGraphGenerator graphGenerator = new StreamGraphGenerator(Collections.singletonList(sink.getTransformation()), env.getConfig(), env.getCheckpointConfig(), (ReadableConfig)configuration);
        graphGenerator.setRuntimeExecutionMode(RuntimeExecutionMode.BATCH);
        this.expectedException.expect(IllegalStateException.class);
        this.expectedException.expectMessage("Batch state backend requires the sorted inputs to be enabled!");
        graphGenerator.generate();
    }

    @Test
    public void testInputSelectableTwoInputTransformation() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource elements1 = env.fromElements((Object[])new Integer[]{1, 2});
        DataStreamSource elements2 = env.fromElements((Object[])new Integer[]{1, 2});
        SingleOutputStreamOperator process = elements1.connect((DataStream)elements2).keyBy(Integer::intValue, Integer::intValue).process(DUMMY_KEYED_CO_PROCESS_FUNCTION);
        SingleOutputStreamOperator selectableOperator = process.connect((DataStream)elements1).keyBy(Integer::intValue, Integer::intValue).transform("operator", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, (TwoInputStreamOperator)new InputSelectableTwoInputOperator());
        DataStreamSink sink = selectableOperator.addSink((SinkFunction)new DiscardingSink());
        StreamGraphGenerator graphGenerator = new StreamGraphGenerator(Collections.singletonList(sink.getTransformation()), env.getConfig(), env.getCheckpointConfig());
        graphGenerator.setRuntimeExecutionMode(RuntimeExecutionMode.BATCH);
        this.expectedException.expect(IllegalStateException.class);
        this.expectedException.expectMessage("Batch state backend and sorting inputs are not supported in graphs with an InputSelectable operator.");
        graphGenerator.generate();
    }

    @Test
    public void testMultiInputTransformation() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource elements1 = env.fromElements((Object[])new Integer[]{1, 2});
        DataStreamSource elements2 = env.fromElements((Object[])new Integer[]{1, 2});
        DataStreamSource elements3 = env.fromElements((Object[])new Integer[]{1, 2});
        MultipleInputOperatorFactory selectableOperator = new MultipleInputOperatorFactory(3, false);
        KeyedMultipleInputTransformation multipleInputTransformation = new KeyedMultipleInputTransformation("operator", (StreamOperatorFactory)selectableOperator, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, 1, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);
        multipleInputTransformation.addInput(elements1.getTransformation(), (KeySelector & Serializable)e -> e);
        multipleInputTransformation.addInput(elements2.getTransformation(), (KeySelector & Serializable)e -> e);
        multipleInputTransformation.addInput(elements3.getTransformation(), (KeySelector & Serializable)e -> e);
        DataStreamSink sink = new MultipleConnectedStreams(env).transform((AbstractMultipleInputTransformation)multipleInputTransformation).addSink((SinkFunction)new DiscardingSink());
        StreamGraphGenerator graphGenerator = new StreamGraphGenerator(Collections.singletonList(sink.getTransformation()), env.getConfig(), env.getCheckpointConfig());
        graphGenerator.setRuntimeExecutionMode(RuntimeExecutionMode.BATCH);
        StreamGraph graph = graphGenerator.generate();
        StreamNode operatorNode = graph.getStreamNode(Integer.valueOf(multipleInputTransformation.getId()));
        Assert.assertThat(operatorNode.getInputRequirements().get(0), (Matcher)CoreMatchers.equalTo((Object)StreamConfig.InputRequirement.SORTED));
        Assert.assertThat(operatorNode.getInputRequirements().get(1), (Matcher)CoreMatchers.equalTo((Object)StreamConfig.InputRequirement.SORTED));
        Assert.assertThat((Object)operatorNode.getOperatorFactory().getChainingStrategy(), (Matcher)CoreMatchers.equalTo((Object)ChainingStrategy.HEAD));
        Assert.assertThat((Object)graph.getStateBackend(), (Matcher)CoreMatchers.instanceOf(BatchExecutionStateBackend.class));
        Assert.assertThat((Object)graph.getTimerServiceProvider(), (Matcher)CoreMatchers.notNullValue());
    }

    @Test
    public void testInputSelectableMultiInputTransformation() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource elements1 = env.fromElements((Object[])new Integer[]{1, 2});
        DataStreamSource elements2 = env.fromElements((Object[])new Integer[]{1, 2});
        DataStreamSource elements3 = env.fromElements((Object[])new Integer[]{1, 2});
        MultipleInputOperatorFactory selectableOperator = new MultipleInputOperatorFactory(3, true);
        KeyedMultipleInputTransformation multipleInputTransformation = new KeyedMultipleInputTransformation("operator", (StreamOperatorFactory)selectableOperator, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, 1, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);
        multipleInputTransformation.addInput(elements1.getTransformation(), (KeySelector & Serializable)e -> e);
        multipleInputTransformation.addInput(elements2.getTransformation(), (KeySelector & Serializable)e -> e);
        multipleInputTransformation.addInput(elements3.getTransformation(), (KeySelector & Serializable)e -> e);
        DataStreamSink sink = new MultipleConnectedStreams(env).transform((AbstractMultipleInputTransformation)multipleInputTransformation).addSink((SinkFunction)new DiscardingSink());
        StreamGraphGenerator graphGenerator = new StreamGraphGenerator(Collections.singletonList(sink.getTransformation()), env.getConfig(), env.getCheckpointConfig());
        graphGenerator.setRuntimeExecutionMode(RuntimeExecutionMode.BATCH);
        this.expectedException.expect(IllegalStateException.class);
        this.expectedException.expectMessage("Batch state backend and sorting inputs are not supported in graphs with an InputSelectable operator.");
        graphGenerator.generate();
    }

    @Test
    public void testFeedbackThrowsExceptionInBatch() {
        SourceTransformation bounded = new SourceTransformation("Bounded Source", (Source)new MockSource(Boundedness.BOUNDED, 100), WatermarkStrategy.noWatermarks(), IntegerTypeInfo.of(Integer.class), 1);
        FeedbackTransformation feedbackTransformation = new FeedbackTransformation((Transformation)bounded, Long.valueOf(5L));
        this.testNoSupportForIterationsInBatchHelper(new Transformation[]{bounded, feedbackTransformation});
    }

    @Test
    public void testCoFeedbackThrowsExceptionInBatch() {
        CoFeedbackTransformation coFeedbackTransformation = new CoFeedbackTransformation(2, TypeInformation.of(Integer.TYPE), Long.valueOf(5L));
        this.testNoSupportForIterationsInBatchHelper(new Transformation[]{coFeedbackTransformation});
    }

    private void testNoSupportForIterationsInBatchHelper(Transformation<?> ... transformations) {
        ArrayList registeredTransformations = new ArrayList();
        Collections.addAll(registeredTransformations, transformations);
        StreamGraphGenerator streamGraphGenerator = new StreamGraphGenerator(registeredTransformations, new ExecutionConfig(), new CheckpointConfig());
        streamGraphGenerator.setRuntimeExecutionMode(RuntimeExecutionMode.BATCH);
        this.expectedException.expect(UnsupportedOperationException.class);
        this.expectedException.expectMessage("Iterations are not supported in BATCH execution mode.");
        streamGraphGenerator.generate();
    }

    private static final class MultipleInputOperatorFactory
    extends AbstractStreamOperatorFactory<Integer> {
        private final int inputsCount;
        private final boolean selectable;

        private MultipleInputOperatorFactory(int inputsCount, boolean selectable) {
            this.inputsCount = inputsCount;
            this.selectable = selectable;
        }

        public <T extends StreamOperator<Integer>> T createStreamOperator(StreamOperatorParameters<Integer> parameters) {
            if (this.selectable) {
                return (T)((Object)new InputSelectableMultipleInputOperator(parameters, this.inputsCount));
            }
            return (T)((Object)new MultipleInputOperator(parameters, this.inputsCount));
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            if (this.selectable) {
                return InputSelectableMultipleInputOperator.class;
            }
            return MultipleInputOperator.class;
        }
    }

    private static final class InputSelectableMultipleInputOperator
    extends MultipleInputOperator
    implements InputSelectable {
        public InputSelectableMultipleInputOperator(StreamOperatorParameters<Integer> parameters, int inputsCount) {
            super(parameters, inputsCount);
        }

        public InputSelection nextSelection() {
            return null;
        }
    }

    private static class MultipleInputOperator
    extends AbstractStreamOperatorV2<Integer>
    implements MultipleInputStreamOperator<Integer> {
        public MultipleInputOperator(StreamOperatorParameters<Integer> parameters, int inputsCount) {
            super(parameters, inputsCount);
        }

        public List<Input> getInputs() {
            return Collections.emptyList();
        }
    }

    private static final class InputSelectableTwoInputOperator
    extends AbstractStreamOperator<Integer>
    implements TwoInputStreamOperator<Integer, Integer, Integer>,
    InputSelectable {
        private InputSelectableTwoInputOperator() {
        }

        public InputSelection nextSelection() {
            return null;
        }

        public void processElement1(StreamRecord<Integer> element) {
        }

        public void processElement2(StreamRecord<Integer> element) {
        }
    }
}

