/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.strategy;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.scheduler.SchedulerOperations;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.ResultPartitionState;
import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.scheduler.strategy.StrategyTestUtil;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulerOperations;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
import org.apache.flink.runtime.topology.VertexID;
import org.apache.flink.runtime.util.JobVertexConnectionUtils;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class PipelinedRegionSchedulingStrategyTest {
    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    private TestingSchedulerOperations testingSchedulerOperation;
    private static final int PARALLELISM = 2;
    private TestingSchedulingTopology testingSchedulingTopology;
    private List<TestingSchedulingExecutionVertex> source;
    private List<TestingSchedulingExecutionVertex> map1;
    private List<TestingSchedulingExecutionVertex> map2;
    private List<TestingSchedulingExecutionVertex> map3;
    private List<TestingSchedulingExecutionVertex> sink;

    PipelinedRegionSchedulingStrategyTest() {
    }

    @BeforeEach
    void setUp() {
        this.testingSchedulerOperation = new TestingSchedulerOperations();
        this.buildTopology();
    }

    private void buildTopology() {
        this.testingSchedulingTopology = new TestingSchedulingTopology();
        this.source = this.testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        this.map1 = this.testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        this.map2 = this.testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        this.map3 = this.testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        this.sink = this.testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish();
        this.testingSchedulingTopology.connectPointwise(this.source, this.map1).withResultPartitionState(ResultPartitionState.CREATED).withResultPartitionType(ResultPartitionType.PIPELINED_BOUNDED).finish();
        this.testingSchedulingTopology.connectPointwise(this.map1, this.map2).withResultPartitionState(ResultPartitionState.CREATED).withResultPartitionType(ResultPartitionType.HYBRID_FULL).finish();
        this.testingSchedulingTopology.connectPointwise(this.map2, this.map3).withResultPartitionState(ResultPartitionState.CREATED).withResultPartitionType(ResultPartitionType.HYBRID_SELECTIVE).finish();
        this.testingSchedulingTopology.connectAllToAll(this.map3, this.sink).withResultPartitionState(ResultPartitionState.CREATED).withResultPartitionType(ResultPartitionType.BLOCKING).finish();
    }

    @Test
    void testStartScheduling() {
        this.startScheduling(this.testingSchedulingTopology);
        ArrayList<List<TestingSchedulingExecutionVertex>> expectedScheduledVertices = new ArrayList<List<TestingSchedulingExecutionVertex>>();
        expectedScheduledVertices.add(Arrays.asList(this.source.get(0), this.map1.get(0)));
        expectedScheduledVertices.add(Arrays.asList(this.source.get(1), this.map1.get(1)));
        expectedScheduledVertices.add(Arrays.asList(this.map2.get(0)));
        expectedScheduledVertices.add(Arrays.asList(this.map2.get(1)));
        expectedScheduledVertices.add(Arrays.asList(this.map3.get(0)));
        expectedScheduledVertices.add(Arrays.asList(this.map3.get(1)));
        StrategyTestUtil.assertLatestScheduledVerticesAreEqualTo(expectedScheduledVertices, this.testingSchedulerOperation);
    }

    @Test
    void testRestartTasks() {
        PipelinedRegionSchedulingStrategy schedulingStrategy = this.startScheduling(this.testingSchedulingTopology);
        Set verticesToRestart = Stream.of(this.source, this.map1, this.map2, this.map3, this.sink).flatMap(Collection::stream).map(TestingSchedulingExecutionVertex::getId).collect(Collectors.toSet());
        schedulingStrategy.restartTasks(verticesToRestart);
        ArrayList<List<TestingSchedulingExecutionVertex>> expectedScheduledVertices = new ArrayList<List<TestingSchedulingExecutionVertex>>();
        expectedScheduledVertices.add(Arrays.asList(this.source.get(0), this.map1.get(0)));
        expectedScheduledVertices.add(Arrays.asList(this.source.get(1), this.map1.get(1)));
        expectedScheduledVertices.add(Arrays.asList(this.map2.get(0)));
        expectedScheduledVertices.add(Arrays.asList(this.map2.get(1)));
        expectedScheduledVertices.add(Arrays.asList(this.map3.get(0)));
        expectedScheduledVertices.add(Arrays.asList(this.map3.get(1)));
        StrategyTestUtil.assertLatestScheduledVerticesAreEqualTo(expectedScheduledVertices, this.testingSchedulerOperation);
    }

    @Test
    void testNotifyingBlockingResultPartitionProducerFinished() {
        PipelinedRegionSchedulingStrategy schedulingStrategy = this.startScheduling(this.testingSchedulingTopology);
        TestingSchedulingExecutionVertex upstream1 = this.map3.get(0);
        upstream1.getProducedResults().iterator().next().markFinished();
        schedulingStrategy.onExecutionStateChange(upstream1.getId(), ExecutionState.FINISHED);
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices()).hasSize(6);
        TestingSchedulingExecutionVertex upstream2 = this.map3.get(1);
        upstream2.getProducedResults().iterator().next().markFinished();
        schedulingStrategy.onExecutionStateChange(upstream2.getId(), ExecutionState.FINISHED);
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices()).hasSize(8);
        ArrayList<List<TestingSchedulingExecutionVertex>> expectedScheduledVertices = new ArrayList<List<TestingSchedulingExecutionVertex>>();
        expectedScheduledVertices.add(Arrays.asList(this.sink.get(0)));
        expectedScheduledVertices.add(Arrays.asList(this.sink.get(1)));
        StrategyTestUtil.assertLatestScheduledVerticesAreEqualTo(expectedScheduledVertices, this.testingSchedulerOperation);
    }

    @Test
    void testSchedulingTopologyWithPersistentBlockingEdges() {
        TestingSchedulingTopology topology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> v1 = topology.addExecutionVertices().withParallelism(1).finish();
        List<TestingSchedulingExecutionVertex> v2 = topology.addExecutionVertices().withParallelism(1).finish();
        topology.connectPointwise(v1, v2).withResultPartitionState(ResultPartitionState.CREATED).withResultPartitionType(ResultPartitionType.BLOCKING_PERSISTENT).finish();
        this.startScheduling(topology);
        ArrayList<List<TestingSchedulingExecutionVertex>> expectedScheduledVertices = new ArrayList<List<TestingSchedulingExecutionVertex>>();
        expectedScheduledVertices.add(Arrays.asList(v1.get(0)));
        StrategyTestUtil.assertLatestScheduledVerticesAreEqualTo(expectedScheduledVertices, this.testingSchedulerOperation);
    }

    @Test
    void testComputingCrossRegionConsumedPartitionGroupsCorrectly() throws Exception {
        JobVertex v1 = PipelinedRegionSchedulingStrategyTest.createJobVertex("v1", 4);
        JobVertex v2 = PipelinedRegionSchedulingStrategyTest.createJobVertex("v2", 3);
        JobVertex v3 = PipelinedRegionSchedulingStrategyTest.createJobVertex("v3", 2);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v2, v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v3, v2, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v3, v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
        JobGraph jobGraph = JobGraphBuilder.newBatchJobGraphBuilder().addJobVertices(ordered).build();
        DefaultExecutionGraph executionGraph = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        SchedulingTopology schedulingTopology = executionGraph.getSchedulingTopology();
        PipelinedRegionSchedulingStrategy schedulingStrategy = new PipelinedRegionSchedulingStrategy((SchedulerOperations)this.testingSchedulerOperation, schedulingTopology);
        Set crossRegionConsumedPartitionGroups = schedulingStrategy.getCrossRegionConsumedPartitionGroups();
        Assertions.assertThat((Collection)crossRegionConsumedPartitionGroups).hasSize(1);
        ConsumedPartitionGroup expected = (ConsumedPartitionGroup)executionGraph.getJobVertex(v3.getID()).getTaskVertices()[1].getAllConsumedPartitionGroups().get(0);
        Assertions.assertThat((Collection)crossRegionConsumedPartitionGroups).contains((Object[])new ConsumedPartitionGroup[]{expected});
    }

    @Test
    void testNoCrossRegionConsumedPartitionGroupsWithAllToAllBlockingEdge() {
        TestingSchedulingTopology topology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> producer = topology.addExecutionVertices().withParallelism(4).finish();
        List<TestingSchedulingExecutionVertex> consumer = topology.addExecutionVertices().withParallelism(4).finish();
        topology.connectAllToAll(producer, consumer).withResultPartitionType(ResultPartitionType.BLOCKING).finish();
        PipelinedRegionSchedulingStrategy schedulingStrategy = new PipelinedRegionSchedulingStrategy((SchedulerOperations)this.testingSchedulerOperation, (SchedulingTopology)topology);
        Set crossRegionConsumedPartitionGroups = schedulingStrategy.getCrossRegionConsumedPartitionGroups();
        Assertions.assertThat((Collection)crossRegionConsumedPartitionGroups).isEmpty();
    }

    @Test
    void testSchedulingTopologyWithBlockingCrossRegionConsumedPartitionGroups() throws Exception {
        JobVertex v1 = PipelinedRegionSchedulingStrategyTest.createJobVertex("v1", 4);
        JobVertex v2 = PipelinedRegionSchedulingStrategyTest.createJobVertex("v2", 3);
        JobVertex v3 = PipelinedRegionSchedulingStrategyTest.createJobVertex("v3", 2);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v2, v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v3, v2, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v3, v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
        JobGraph jobGraph = JobGraphBuilder.newBatchJobGraphBuilder().addJobVertices(ordered).build();
        DefaultExecutionGraph executionGraph = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        SchedulingTopology schedulingTopology = executionGraph.getSchedulingTopology();
        ArrayList regions = new ArrayList();
        schedulingTopology.getAllPipelinedRegions().forEach(regions::add);
        Assertions.assertThat(regions).hasSize(2);
        ExecutionVertex v31 = executionGraph.getJobVertex(v3.getID()).getTaskVertices()[0];
        HashSet region1 = new HashSet();
        ((SchedulingPipelinedRegion)schedulingTopology.getPipelinedRegionOfVertex((VertexID)v31.getID())).getVertices().forEach(vertex -> region1.add((ExecutionVertexID)vertex.getId()));
        Assertions.assertThat(region1).hasSize(5);
        ExecutionVertex v32 = executionGraph.getJobVertex(v3.getID()).getTaskVertices()[1];
        HashSet region2 = new HashSet();
        ((SchedulingPipelinedRegion)schedulingTopology.getPipelinedRegionOfVertex((VertexID)v32.getID())).getVertices().forEach(vertex -> region2.add((ExecutionVertexID)vertex.getId()));
        Assertions.assertThat(region2).hasSize(4);
        PipelinedRegionSchedulingStrategy schedulingStrategy = this.startScheduling(schedulingTopology);
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices()).hasSize(1);
        List<ExecutionVertexID> scheduledVertices1 = this.testingSchedulerOperation.getScheduledVertices().get(0);
        Assertions.assertThat(scheduledVertices1).hasSize(5);
        for (ExecutionVertexID vertexId : scheduledVertices1) {
            Assertions.assertThat(region1).contains((Object[])new ExecutionVertexID[]{vertexId});
        }
        ExecutionVertex v22 = executionGraph.getJobVertex(v2.getID()).getTaskVertices()[1];
        v22.finishPartitionsIfNeeded();
        schedulingStrategy.onExecutionStateChange(v22.getID(), ExecutionState.FINISHED);
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices()).hasSize(2);
        List<ExecutionVertexID> scheduledVertices2 = this.testingSchedulerOperation.getScheduledVertices().get(1);
        Assertions.assertThat(scheduledVertices2).hasSize(4);
        for (ExecutionVertexID vertexId : scheduledVertices2) {
            Assertions.assertThat(region2).contains((Object[])new ExecutionVertexID[]{vertexId});
        }
    }

    @Test
    void testSchedulingTopologyWithHybridCrossRegionConsumedPartitionGroups() throws Exception {
        JobVertex v1 = PipelinedRegionSchedulingStrategyTest.createJobVertex("v1", 4);
        JobVertex v2 = PipelinedRegionSchedulingStrategyTest.createJobVertex("v2", 3);
        JobVertex v3 = PipelinedRegionSchedulingStrategyTest.createJobVertex("v3", 2);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v2, v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v3, v2, DistributionPattern.POINTWISE, ResultPartitionType.HYBRID_FULL);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v3, v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
        JobGraph jobGraph = JobGraphBuilder.newBatchJobGraphBuilder().addJobVertices(ordered).build();
        DefaultExecutionGraph executionGraph = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        SchedulingTopology schedulingTopology = executionGraph.getSchedulingTopology();
        ArrayList regions = new ArrayList();
        schedulingTopology.getAllPipelinedRegions().forEach(regions::add);
        Assertions.assertThat(regions).hasSize(2);
        ExecutionVertex v31 = executionGraph.getJobVertex(v3.getID()).getTaskVertices()[0];
        HashSet region1 = new HashSet();
        ((SchedulingPipelinedRegion)schedulingTopology.getPipelinedRegionOfVertex((VertexID)v31.getID())).getVertices().forEach(vertex -> region1.add((ExecutionVertexID)vertex.getId()));
        Assertions.assertThat(region1).hasSize(5);
        ExecutionVertex v32 = executionGraph.getJobVertex(v3.getID()).getTaskVertices()[1];
        HashSet region2 = new HashSet();
        ((SchedulingPipelinedRegion)schedulingTopology.getPipelinedRegionOfVertex((VertexID)v32.getID())).getVertices().forEach(vertex -> region2.add((ExecutionVertexID)vertex.getId()));
        Assertions.assertThat(region2).hasSize(4);
        this.startScheduling(schedulingTopology);
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices()).hasSize(2);
        List<ExecutionVertexID> scheduledVertices1 = this.testingSchedulerOperation.getScheduledVertices().get(0);
        Assertions.assertThat(scheduledVertices1).hasSize(5);
        for (ExecutionVertexID vertexId : scheduledVertices1) {
            Assertions.assertThat(region1).contains((Object[])new ExecutionVertexID[]{vertexId});
        }
        List<ExecutionVertexID> scheduledVertices2 = this.testingSchedulerOperation.getScheduledVertices().get(1);
        Assertions.assertThat(scheduledVertices2).hasSize(4);
        for (ExecutionVertexID vertexId : scheduledVertices2) {
            Assertions.assertThat(region2).contains((Object[])new ExecutionVertexID[]{vertexId});
        }
    }

    @Test
    void testScheduleBlockingDownstreamTaskIndividually() throws Exception {
        JobVertex v1 = PipelinedRegionSchedulingStrategyTest.createJobVertex("v1", 2);
        JobVertex v2 = PipelinedRegionSchedulingStrategyTest.createJobVertex("v2", 2);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v2, v1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
        JobGraph jobGraph = JobGraphBuilder.newBatchJobGraphBuilder().addJobVertices(ordered).build();
        DefaultExecutionGraph executionGraph = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        SchedulingTopology schedulingTopology = executionGraph.getSchedulingTopology();
        PipelinedRegionSchedulingStrategy schedulingStrategy = this.startScheduling(schedulingTopology);
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices()).hasSize(2);
        ExecutionVertex v11 = executionGraph.getJobVertex(v1.getID()).getTaskVertices()[0];
        v11.finishPartitionsIfNeeded();
        schedulingStrategy.onExecutionStateChange(v11.getID(), ExecutionState.FINISHED);
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices()).hasSize(3);
    }

    @Test
    void testFinishHybridPartitionWillNotRescheduleDownstream() throws Exception {
        JobVertex v1 = PipelinedRegionSchedulingStrategyTest.createJobVertex("v1", 1);
        JobVertex v2 = PipelinedRegionSchedulingStrategyTest.createJobVertex("v2", 1);
        JobVertex v3 = PipelinedRegionSchedulingStrategyTest.createJobVertex("v3", 1);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v2, v1, DistributionPattern.POINTWISE, ResultPartitionType.HYBRID_FULL);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v3, v1, DistributionPattern.POINTWISE, ResultPartitionType.HYBRID_SELECTIVE);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
        JobGraph jobGraph = JobGraphBuilder.newBatchJobGraphBuilder().addJobVertices(ordered).build();
        DefaultExecutionGraph executionGraph = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        SchedulingTopology schedulingTopology = executionGraph.getSchedulingTopology();
        PipelinedRegionSchedulingStrategy schedulingStrategy = this.startScheduling(schedulingTopology);
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices()).hasSize(3);
        ExecutionVertex v11 = executionGraph.getJobVertex(v1.getID()).getTaskVertices()[0];
        schedulingStrategy.onExecutionStateChange(v11.getID(), ExecutionState.FINISHED);
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices()).hasSize(3);
    }

    @Test
    void testScheduleTopologyWithHybridAndBlockingEdge() throws Exception {
        JobVertex v1 = PipelinedRegionSchedulingStrategyTest.createJobVertex("v1", 1);
        JobVertex v2 = PipelinedRegionSchedulingStrategyTest.createJobVertex("v2", 1);
        JobVertex v3 = PipelinedRegionSchedulingStrategyTest.createJobVertex("v3", 1);
        JobVertex v4 = PipelinedRegionSchedulingStrategyTest.createJobVertex("v4", 1);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v2, v1, DistributionPattern.POINTWISE, ResultPartitionType.HYBRID_FULL);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v3, v2, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v4, v3, DistributionPattern.POINTWISE, ResultPartitionType.HYBRID_SELECTIVE);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4));
        JobGraph jobGraph = JobGraphBuilder.newBatchJobGraphBuilder().addJobVertices(ordered).build();
        DefaultExecutionGraph executionGraph = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        SchedulingTopology schedulingTopology = executionGraph.getSchedulingTopology();
        PipelinedRegionSchedulingStrategy schedulingStrategy = this.startScheduling(schedulingTopology);
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices()).hasSize(2);
        ExecutionVertex v11 = executionGraph.getJobVertex(v1.getID()).getTaskVertices()[0];
        ExecutionVertex v21 = executionGraph.getJobVertex(v2.getID()).getTaskVertices()[0];
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices().get(0)).containsExactly((Object[])new ExecutionVertexID[]{v11.getID()});
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices().get(1)).containsExactly((Object[])new ExecutionVertexID[]{v21.getID()});
        v21.finishPartitionsIfNeeded();
        schedulingStrategy.onExecutionStateChange(v21.getID(), ExecutionState.FINISHED);
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices()).hasSize(4);
        ExecutionVertex v31 = executionGraph.getJobVertex(v3.getID()).getTaskVertices()[0];
        ExecutionVertex v41 = executionGraph.getJobVertex(v4.getID()).getTaskVertices()[0];
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices().get(2)).containsExactly((Object[])new ExecutionVertexID[]{v31.getID()});
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices().get(3)).containsExactly((Object[])new ExecutionVertexID[]{v41.getID()});
    }

    @Test
    void testSchedulingRegionWithInnerNonPipelinedEdge() throws Exception {
        JobVertex v1 = PipelinedRegionSchedulingStrategyTest.createJobVertex("v1", 1);
        JobVertex v2 = PipelinedRegionSchedulingStrategyTest.createJobVertex("v2", 1);
        JobVertex v3 = PipelinedRegionSchedulingStrategyTest.createJobVertex("v3", 1);
        JobVertex v4 = PipelinedRegionSchedulingStrategyTest.createJobVertex("v4", 1);
        JobVertex v5 = PipelinedRegionSchedulingStrategyTest.createJobVertex("v5", 1);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v2, v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v3, v2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v4, v2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v5, v2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v3, v1, DistributionPattern.POINTWISE, ResultPartitionType.HYBRID_FULL);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v4, v1, DistributionPattern.POINTWISE, ResultPartitionType.HYBRID_SELECTIVE);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v4, v1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
        JobGraph jobGraph = JobGraphBuilder.newBatchJobGraphBuilder().addJobVertices(ordered).build();
        DefaultExecutionGraph executionGraph = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        SchedulingTopology schedulingTopology = executionGraph.getSchedulingTopology();
        this.startScheduling(schedulingTopology);
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices()).hasSize(1);
        List<ExecutionVertexID> executionVertexIds = this.testingSchedulerOperation.getScheduledVertices().get(0);
        Assertions.assertThat(executionVertexIds).hasSize(5);
    }

    @Test
    void testDownstreamRegionWillBeBlockedByBlockingEdge() throws Exception {
        JobVertex v1 = PipelinedRegionSchedulingStrategyTest.createJobVertex("v1", 1);
        JobVertex v2 = PipelinedRegionSchedulingStrategyTest.createJobVertex("v2", 1);
        JobVertex v3 = PipelinedRegionSchedulingStrategyTest.createJobVertex("v3", 1);
        JobVertex v4 = PipelinedRegionSchedulingStrategyTest.createJobVertex("v4", 1);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v4, v1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v4, v2, DistributionPattern.POINTWISE, ResultPartitionType.HYBRID_FULL);
        JobVertexConnectionUtils.connectNewDataSetAsInput(v4, v3, DistributionPattern.POINTWISE, ResultPartitionType.HYBRID_SELECTIVE);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4));
        JobGraph jobGraph = JobGraphBuilder.newBatchJobGraphBuilder().addJobVertices(ordered).build();
        DefaultExecutionGraph executionGraph = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        SchedulingTopology schedulingTopology = executionGraph.getSchedulingTopology();
        PipelinedRegionSchedulingStrategy schedulingStrategy = this.startScheduling(schedulingTopology);
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices()).hasSize(3);
        ExecutionVertex v11 = executionGraph.getJobVertex(v1.getID()).getTaskVertices()[0];
        v11.finishPartitionsIfNeeded();
        schedulingStrategy.onExecutionStateChange(v11.getID(), ExecutionState.FINISHED);
        Assertions.assertThat(this.testingSchedulerOperation.getScheduledVertices()).hasSize(4);
    }

    private static JobVertex createJobVertex(String vertexName, int parallelism) {
        JobVertex jobVertex = new JobVertex(vertexName);
        jobVertex.setParallelism(parallelism);
        jobVertex.setInvokableClass(AbstractInvokable.class);
        return jobVertex;
    }

    private PipelinedRegionSchedulingStrategy startScheduling(SchedulingTopology schedulingTopology) {
        PipelinedRegionSchedulingStrategy schedulingStrategy = new PipelinedRegionSchedulingStrategy((SchedulerOperations)this.testingSchedulerOperation, schedulingTopology);
        schedulingStrategy.startScheduling();
        return schedulingStrategy;
    }
}

