/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.EdgeManagerBuildUtil;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class EdgeManagerBuildUtilTest {
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    EdgeManagerBuildUtilTest() {
    }

    @Test
    void testGetMaxNumEdgesToTargetInPointwiseConnection() throws Exception {
        this.testGetMaxNumEdgesToTarget(17, 17, DistributionPattern.POINTWISE);
        this.testGetMaxNumEdgesToTarget(17, 23, DistributionPattern.POINTWISE);
        this.testGetMaxNumEdgesToTarget(17, 34, DistributionPattern.POINTWISE);
        this.testGetMaxNumEdgesToTarget(34, 17, DistributionPattern.POINTWISE);
        this.testGetMaxNumEdgesToTarget(23, 17, DistributionPattern.POINTWISE);
    }

    @Test
    void testGetMaxNumEdgesToTargetInAllToAllConnection() throws Exception {
        this.testGetMaxNumEdgesToTarget(17, 17, DistributionPattern.ALL_TO_ALL);
        this.testGetMaxNumEdgesToTarget(17, 23, DistributionPattern.ALL_TO_ALL);
        this.testGetMaxNumEdgesToTarget(17, 34, DistributionPattern.ALL_TO_ALL);
        this.testGetMaxNumEdgesToTarget(34, 17, DistributionPattern.ALL_TO_ALL);
        this.testGetMaxNumEdgesToTarget(23, 17, DistributionPattern.ALL_TO_ALL);
    }

    private void testGetMaxNumEdgesToTarget(int upstream, int downstream, DistributionPattern pattern) throws Exception {
        int actual;
        Pair<ExecutionJobVertex, ExecutionJobVertex> pair = this.setupExecutionGraph(upstream, downstream, pattern);
        ExecutionJobVertex upstreamEJV = (ExecutionJobVertex)pair.getLeft();
        ExecutionJobVertex downstreamEJV = (ExecutionJobVertex)pair.getRight();
        int calculatedMaxForUpstream = EdgeManagerBuildUtil.computeMaxEdgesToTargetExecutionVertex((int)upstream, (int)downstream, (DistributionPattern)pattern);
        int actualMaxForUpstream = -1;
        for (ExecutionVertex ev : upstreamEJV.getTaskVertices()) {
            Assertions.assertThat((Map)ev.getProducedPartitions()).hasSize(1);
            IntermediateResultPartition partition = (IntermediateResultPartition)ev.getProducedPartitions().values().iterator().next();
            ConsumerVertexGroup consumerVertexGroup = (ConsumerVertexGroup)partition.getConsumerVertexGroups().get(0);
            actual = consumerVertexGroup.size();
            if (actual <= actualMaxForUpstream) continue;
            actualMaxForUpstream = actual;
        }
        Assertions.assertThat((int)actualMaxForUpstream).isEqualTo(calculatedMaxForUpstream);
        int calculatedMaxForDownstream = EdgeManagerBuildUtil.computeMaxEdgesToTargetExecutionVertex((int)downstream, (int)upstream, (DistributionPattern)pattern);
        int actualMaxForDownstream = -1;
        for (ExecutionVertex ev : downstreamEJV.getTaskVertices()) {
            Assertions.assertThat((int)ev.getNumberOfInputs()).isEqualTo(1);
            actual = ev.getConsumedPartitionGroup(0).size();
            if (actual <= actualMaxForDownstream) continue;
            actualMaxForDownstream = actual;
        }
        Assertions.assertThat((int)actualMaxForDownstream).isEqualTo(calculatedMaxForDownstream);
    }

    private Pair<ExecutionJobVertex, ExecutionJobVertex> setupExecutionGraph(int upstream, int downstream, DistributionPattern pattern) throws Exception {
        JobVertex v1 = new JobVertex("vertex1");
        JobVertex v2 = new JobVertex("vertex2");
        v1.setParallelism(upstream);
        v2.setParallelism(downstream);
        v1.setInvokableClass(AbstractInvokable.class);
        v2.setInvokableClass(AbstractInvokable.class);
        v2.connectNewDataSetAsInput(v1, pattern, ResultPartitionType.PIPELINED);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
        DefaultExecutionGraph eg = TestingDefaultExecutionGraphBuilder.newBuilder().setVertexParallelismStore(SchedulerBase.computeVertexParallelismStore(ordered)).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        eg.attachJobGraph(ordered);
        return Pair.of(eg.getAllVertices().get(v1.getID()), eg.getAllVertices().get(v2.getID()));
    }
}

