/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.util.JobVertexConnectionUtils;
import org.apache.flink.shaded.guava33.com.google.common.collect.Sets;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class DefaultExecutionGraphConstructionTest {
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    private static final JobManagerJobMetricGroup JOB_MANAGER_JOB_METRIC_GROUP = UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup();

    DefaultExecutionGraphConstructionTest() {
    }

    private ExecutionGraph createDefaultExecutionGraph(List<JobVertex> vertices) throws Exception {
        return TestingDefaultExecutionGraphBuilder.newBuilder().setVertexParallelismStore(SchedulerBase.computeVertexParallelismStore(vertices)).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
    }

    private ExecutionGraph createDynamicExecutionGraph(List<JobVertex> vertices) throws Exception {
        return TestingDefaultExecutionGraphBuilder.newBuilder().setVertexParallelismStore(SchedulerBase.computeVertexParallelismStore(vertices)).buildDynamicGraph((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
    }

    @Test
    void testExecutionAttemptIdInTwoIdenticalJobsIsNotSame() throws Exception {
        JobVertex v1 = new JobVertex("vertex1");
        JobVertex v2 = new JobVertex("vertex2");
        JobVertex v3 = new JobVertex("vertex3");
        v1.setParallelism(5);
        v2.setParallelism(7);
        v3.setParallelism(2);
        v1.setInvokableClass(AbstractInvokable.class);
        v2.setInvokableClass(AbstractInvokable.class);
        v3.setInvokableClass(AbstractInvokable.class);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
        ExecutionGraph eg1 = this.createDefaultExecutionGraph(ordered);
        ExecutionGraph eg2 = this.createDefaultExecutionGraph(ordered);
        eg1.attachJobGraph(ordered, JOB_MANAGER_JOB_METRIC_GROUP);
        eg2.attachJobGraph(ordered, JOB_MANAGER_JOB_METRIC_GROUP);
        Assertions.assertThat((Collection)Sets.intersection(eg1.getRegisteredExecutions().keySet(), eg2.getRegisteredExecutions().keySet())).isEmpty();
    }

    @Test
    void testCreateSimpleGraphBipartite() throws Exception {
        JobVertex v1 = new JobVertex("vertex1");
        JobVertex v2 = new JobVertex("vertex2");
        JobVertex v3 = new JobVertex("vertex3");
        JobVertex v4 = new JobVertex("vertex4");
        JobVertex v5 = new JobVertex("vertex5");
        v1.setParallelism(5);
        v2.setParallelism(7);
        v3.setParallelism(2);
        v4.setParallelism(11);
        v5.setParallelism(4);
        v1.setInvokableClass(AbstractInvokable.class);
        v2.setInvokableClass(AbstractInvokable.class);
        v3.setInvokableClass(AbstractInvokable.class);
        v4.setInvokableClass(AbstractInvokable.class);
        v5.setInvokableClass(AbstractInvokable.class);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v2, v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v4, v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v4, v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v5, v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v5, v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
        ExecutionGraph eg = this.createDefaultExecutionGraph(ordered);
        eg.attachJobGraph(ordered, JOB_MANAGER_JOB_METRIC_GROUP);
        this.verifyTestGraph(eg, v1, v2, v3, v4, v5);
    }

    private void verifyTestGraph(ExecutionGraph eg, JobVertex v1, JobVertex v2, JobVertex v3, JobVertex v4, JobVertex v5) {
        ExecutionGraphTestUtils.verifyGeneratedExecutionJobVertex(eg, v1, null, Collections.singletonList(v2));
        ExecutionGraphTestUtils.verifyGeneratedExecutionJobVertex(eg, v2, Collections.singletonList(v1), Collections.singletonList(v4));
        ExecutionGraphTestUtils.verifyGeneratedExecutionJobVertex(eg, v3, null, Arrays.asList(v4, v5));
        ExecutionGraphTestUtils.verifyGeneratedExecutionJobVertex(eg, v4, Arrays.asList(v2, v3), Collections.singletonList(v5));
        ExecutionGraphTestUtils.verifyGeneratedExecutionJobVertex(eg, v5, Arrays.asList(v4, v3), null);
    }

    @Test
    void testCannotConnectWrongOrder() throws Exception {
        JobVertex v1 = new JobVertex("vertex1");
        JobVertex v2 = new JobVertex("vertex2");
        JobVertex v3 = new JobVertex("vertex3");
        JobVertex v4 = new JobVertex("vertex4");
        JobVertex v5 = new JobVertex("vertex5");
        v1.setParallelism(5);
        v2.setParallelism(7);
        v3.setParallelism(2);
        v4.setParallelism(11);
        v5.setParallelism(4);
        v1.setInvokableClass(AbstractInvokable.class);
        v2.setInvokableClass(AbstractInvokable.class);
        v3.setInvokableClass(AbstractInvokable.class);
        v4.setInvokableClass(AbstractInvokable.class);
        v5.setInvokableClass(AbstractInvokable.class);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v2, v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v4, v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v4, v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v5, v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v5, v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v5, v4));
        ExecutionGraph eg = this.createDefaultExecutionGraph(ordered);
        Assertions.assertThatThrownBy(() -> eg.attachJobGraph(ordered, JOB_MANAGER_JOB_METRIC_GROUP)).isInstanceOf(JobException.class);
    }

    @Test
    void testSetupInputSplits() throws Exception {
        InputSplit[] emptySplits = new InputSplit[]{};
        TestingInputSplitAssigner assigner1 = new TestingInputSplitAssigner();
        TestingInputSplitAssigner assigner2 = new TestingInputSplitAssigner();
        TestingInputSplitSource source1 = new TestingInputSplitSource(emptySplits, (InputSplitAssigner)assigner1);
        TestingInputSplitSource source2 = new TestingInputSplitSource(emptySplits, (InputSplitAssigner)assigner2);
        JobVertex v1 = new JobVertex("vertex1");
        JobVertex v2 = new JobVertex("vertex2");
        JobVertex v3 = new JobVertex("vertex3");
        JobVertex v4 = new JobVertex("vertex4");
        JobVertex v5 = new JobVertex("vertex5");
        v1.setParallelism(5);
        v2.setParallelism(7);
        v3.setParallelism(2);
        v4.setParallelism(11);
        v5.setParallelism(4);
        v1.setInvokableClass(AbstractInvokable.class);
        v2.setInvokableClass(AbstractInvokable.class);
        v3.setInvokableClass(AbstractInvokable.class);
        v4.setInvokableClass(AbstractInvokable.class);
        v5.setInvokableClass(AbstractInvokable.class);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v2, v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v4, v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v4, v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v5, v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v5, v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        v3.setInputSplitSource((InputSplitSource)source1);
        v5.setInputSplitSource((InputSplitSource)source2);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
        ExecutionGraph eg = this.createDefaultExecutionGraph(ordered);
        eg.attachJobGraph(ordered, JOB_MANAGER_JOB_METRIC_GROUP);
        Assertions.assertThat((Object)((ExecutionJobVertex)eg.getAllVertices().get(v3.getID())).getSplitAssigner()).isEqualTo((Object)assigner1);
        Assertions.assertThat((Object)((ExecutionJobVertex)eg.getAllVertices().get(v5.getID())).getSplitAssigner()).isEqualTo((Object)assigner2);
    }

    @Test
    void testMultiConsumersForOneIntermediateResult() throws Exception {
        JobVertex v1 = new JobVertex("vertex1");
        JobVertex v2 = new JobVertex("vertex2");
        JobVertex v3 = new JobVertex("vertex3");
        IntermediateDataSetID dataSetId = new IntermediateDataSetID();
        JobVertexConnectionUtils.connectNewDataSetAsInput(v2, v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING, dataSetId, false);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v3, v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING, dataSetId, false);
        ArrayList<JobVertex> vertices = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
        ExecutionGraph eg = this.createDefaultExecutionGraph(vertices);
        eg.attachJobGraph(vertices, JOB_MANAGER_JOB_METRIC_GROUP);
        ExecutionJobVertex ejv1 = (ExecutionJobVertex)Preconditions.checkNotNull((Object)eg.getJobVertex(v1.getID()));
        Assertions.assertThat((Object[])ejv1.getProducedDataSets()).hasSize(1);
        Assertions.assertThat((Comparable)ejv1.getProducedDataSets()[0].getId()).isEqualTo((Object)dataSetId);
        ExecutionJobVertex ejv2 = (ExecutionJobVertex)Preconditions.checkNotNull((Object)eg.getJobVertex(v2.getID()));
        Assertions.assertThat((List)ejv2.getInputs()).hasSize(1);
        Assertions.assertThat((Comparable)((IntermediateResult)ejv2.getInputs().get(0)).getId()).isEqualTo((Object)dataSetId);
        ExecutionJobVertex ejv3 = (ExecutionJobVertex)Preconditions.checkNotNull((Object)eg.getJobVertex(v3.getID()));
        Assertions.assertThat((List)ejv3.getInputs()).hasSize(1);
        Assertions.assertThat((Comparable)((IntermediateResult)ejv3.getInputs().get(0)).getId()).isEqualTo((Object)dataSetId);
        List partitionGroups1 = ejv2.getTaskVertices()[0].getAllConsumedPartitionGroups();
        Assertions.assertThat((List)partitionGroups1).hasSize(1);
        Assertions.assertThat((Comparable)((ConsumedPartitionGroup)partitionGroups1.get(0)).getIntermediateDataSetID()).isEqualTo((Object)dataSetId);
        List partitionGroups2 = ejv3.getTaskVertices()[0].getAllConsumedPartitionGroups();
        Assertions.assertThat((List)partitionGroups2).hasSize(1);
        Assertions.assertThat((Comparable)((ConsumedPartitionGroup)partitionGroups2.get(0)).getIntermediateDataSetID()).isEqualTo((Object)dataSetId);
    }

    @Test
    void testRegisterConsumedPartitionGroupToEdgeManager() throws Exception {
        JobVertex v1 = new JobVertex("source");
        JobVertex v2 = new JobVertex("sink");
        v1.setParallelism(2);
        v2.setParallelism(2);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v2, v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
        ExecutionGraph eg = this.createDefaultExecutionGraph(ordered);
        eg.attachJobGraph(ordered, JOB_MANAGER_JOB_METRIC_GROUP);
        IntermediateResult result = Objects.requireNonNull(eg.getJobVertex(v1.getID())).getProducedDataSets()[0];
        IntermediateResultPartition partition1 = result.getPartitions()[0];
        IntermediateResultPartition partition2 = result.getPartitions()[1];
        Assertions.assertThat((Iterable)((Iterable)partition2.getConsumedPartitionGroups().get(0))).isEqualTo(partition1.getConsumedPartitionGroups().get(0));
        ConsumedPartitionGroup consumedPartitionGroup = (ConsumedPartitionGroup)partition1.getConsumedPartitionGroups().get(0);
        HashSet<IntermediateResultPartitionID> partitionIds = new HashSet<IntermediateResultPartitionID>();
        for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) {
            partitionIds.add(partitionId);
        }
        Assertions.assertThat(partitionIds).containsExactlyInAnyOrder((Object[])new IntermediateResultPartitionID[]{partition1.getPartitionId(), partition2.getPartitionId()});
    }

    @Test
    void testPointWiseConsumedPartitionGroupPartitionFinished() throws Exception {
        JobVertex v1 = new JobVertex("source");
        JobVertex v2 = new JobVertex("sink");
        v1.setParallelism(4);
        v2.setParallelism(2);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v2, v1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
        ExecutionGraph eg = this.createDefaultExecutionGraph(ordered);
        eg.attachJobGraph(ordered, JOB_MANAGER_JOB_METRIC_GROUP);
        IntermediateResult result = Objects.requireNonNull(eg.getJobVertex(v1.getID())).getProducedDataSets()[0];
        IntermediateResultPartition partition1 = result.getPartitions()[0];
        IntermediateResultPartition partition2 = result.getPartitions()[1];
        IntermediateResultPartition partition3 = result.getPartitions()[2];
        IntermediateResultPartition partition4 = result.getPartitions()[3];
        ConsumedPartitionGroup consumedPartitionGroup1 = (ConsumedPartitionGroup)partition1.getConsumedPartitionGroups().get(0);
        ConsumedPartitionGroup consumedPartitionGroup2 = (ConsumedPartitionGroup)partition4.getConsumedPartitionGroups().get(0);
        Assertions.assertThat((int)consumedPartitionGroup1.getNumberOfUnfinishedPartitions()).isEqualTo(2);
        Assertions.assertThat((int)consumedPartitionGroup2.getNumberOfUnfinishedPartitions()).isEqualTo(2);
        partition1.markFinished();
        partition2.markFinished();
        Assertions.assertThat((int)consumedPartitionGroup1.getNumberOfUnfinishedPartitions()).isZero();
        partition3.markFinished();
        partition4.markFinished();
        Assertions.assertThat((int)consumedPartitionGroup2.getNumberOfUnfinishedPartitions()).isZero();
    }

    @Test
    void testAllToAllConsumedPartitionGroupPartitionFinished() throws Exception {
        JobVertex v1 = new JobVertex("source");
        JobVertex v2 = new JobVertex("sink");
        v1.setParallelism(2);
        v2.setParallelism(2);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v2, v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
        ExecutionGraph eg = this.createDefaultExecutionGraph(ordered);
        eg.attachJobGraph(ordered, JOB_MANAGER_JOB_METRIC_GROUP);
        IntermediateResult result = Objects.requireNonNull(eg.getJobVertex(v1.getID())).getProducedDataSets()[0];
        IntermediateResultPartition partition1 = result.getPartitions()[0];
        IntermediateResultPartition partition2 = result.getPartitions()[1];
        ConsumedPartitionGroup consumedPartitionGroup = (ConsumedPartitionGroup)partition1.getConsumedPartitionGroups().get(0);
        Assertions.assertThat((int)consumedPartitionGroup.getNumberOfUnfinishedPartitions()).isEqualTo(2);
        partition1.markFinished();
        Assertions.assertThat((int)consumedPartitionGroup.getNumberOfUnfinishedPartitions()).isOne();
        partition2.markFinished();
        Assertions.assertThat((int)consumedPartitionGroup.getNumberOfUnfinishedPartitions()).isZero();
    }

    @Test
    void testDynamicGraphAllToAllConsumedPartitionGroupPartitionFinished() throws Exception {
        JobVertex v1 = new JobVertex("source");
        JobVertex v2 = new JobVertex("sink");
        v1.setParallelism(2);
        v2.setParallelism(2);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v2, v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
        ExecutionGraph eg = this.createDynamicExecutionGraph(ordered);
        eg.attachJobGraph(ordered, JOB_MANAGER_JOB_METRIC_GROUP);
        ExecutionJobVertex ejv1 = eg.getJobVertex(v1.getID());
        eg.initializeJobVertex(ejv1, 0L);
        IntermediateResult result = Objects.requireNonNull(eg.getJobVertex(v1.getID())).getProducedDataSets()[0];
        IntermediateResultPartition partition1 = result.getPartitions()[0];
        IntermediateResultPartition partition2 = result.getPartitions()[1];
        partition1.markFinished();
        partition2.markFinished();
        Assertions.assertThat((List)partition1.getConsumedPartitionGroups()).isEmpty();
        ExecutionJobVertex ejv2 = eg.getJobVertex(v2.getID());
        eg.initializeJobVertex(ejv2, 0L);
        ConsumedPartitionGroup consumedPartitionGroup = (ConsumedPartitionGroup)partition1.getConsumedPartitionGroups().get(0);
        Assertions.assertThat((int)consumedPartitionGroup.getNumberOfUnfinishedPartitions()).isZero();
    }

    @Test
    void testDynamicGraphPointWiseConsumedPartitionGroupPartitionFinished() throws Exception {
        JobVertex v1 = new JobVertex("source");
        JobVertex v2 = new JobVertex("sink");
        v1.setParallelism(4);
        v2.setParallelism(2);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v2, v1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
        ExecutionGraph eg = this.createDynamicExecutionGraph(ordered);
        eg.attachJobGraph(ordered, JOB_MANAGER_JOB_METRIC_GROUP);
        ExecutionJobVertex ejv1 = eg.getJobVertex(v1.getID());
        eg.initializeJobVertex(ejv1, 0L);
        IntermediateResult result = Objects.requireNonNull(eg.getJobVertex(v1.getID())).getProducedDataSets()[0];
        IntermediateResultPartition partition1 = result.getPartitions()[0];
        IntermediateResultPartition partition2 = result.getPartitions()[1];
        IntermediateResultPartition partition3 = result.getPartitions()[2];
        IntermediateResultPartition partition4 = result.getPartitions()[3];
        partition1.markFinished();
        partition2.markFinished();
        partition3.markFinished();
        partition4.markFinished();
        Assertions.assertThat((List)partition1.getConsumedPartitionGroups()).isEmpty();
        Assertions.assertThat((List)partition4.getConsumedPartitionGroups()).isEmpty();
        ExecutionJobVertex ejv2 = eg.getJobVertex(v2.getID());
        eg.initializeJobVertex(ejv2, 0L);
        ConsumedPartitionGroup consumedPartitionGroup1 = (ConsumedPartitionGroup)partition1.getConsumedPartitionGroups().get(0);
        Assertions.assertThat((int)consumedPartitionGroup1.getNumberOfUnfinishedPartitions()).isZero();
        ConsumedPartitionGroup consumedPartitionGroup2 = (ConsumedPartitionGroup)partition4.getConsumedPartitionGroups().get(0);
        Assertions.assertThat((int)consumedPartitionGroup2.getNumberOfUnfinishedPartitions()).isZero();
    }

    @Test
    void testAttachToDynamicGraph() throws Exception {
        JobVertex v1 = new JobVertex("source");
        JobVertex v2 = new JobVertex("sink");
        v1.setParallelism(2);
        v2.setParallelism(2);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v2, v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
        ExecutionGraph eg = this.createDynamicExecutionGraph(ordered);
        eg.attachJobGraph(ordered, JOB_MANAGER_JOB_METRIC_GROUP);
        Assertions.assertThat((Map)eg.getAllVertices()).hasSize(2);
        Iterator jobVertices = eg.getVerticesTopologically().iterator();
        Assertions.assertThat((boolean)((ExecutionJobVertex)jobVertices.next()).isInitialized()).isFalse();
        Assertions.assertThat((boolean)((ExecutionJobVertex)jobVertices.next()).isInitialized()).isFalse();
    }

    private static final class TestingInputSplitSource<T extends InputSplit>
    implements InputSplitSource<T> {
        private final T[] inputSplits;
        private final InputSplitAssigner assigner;

        private TestingInputSplitSource(T[] inputSplits, InputSplitAssigner assigner) {
            this.inputSplits = inputSplits;
            this.assigner = assigner;
        }

        public T[] createInputSplits(int minNumSplits) throws Exception {
            return this.inputSplits;
        }

        public InputSplitAssigner getInputSplitAssigner(T[] inputSplits) {
            return this.assigner;
        }
    }

    private static final class TestingInputSplitAssigner
    implements InputSplitAssigner {
        private TestingInputSplitAssigner() {
        }

        public InputSplit getNextInputSplit(String host, int taskId) {
            return null;
        }

        public void returnInputSplit(List<InputSplit> splits, int taskId) {
        }
    }
}

