/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertexTest;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

public class IntermediateResultPartitionTest {
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    @Test
    void testPartitionDataAllProduced() throws Exception {
        IntermediateResult result = IntermediateResultPartitionTest.createResult(ResultPartitionType.BLOCKING, 2);
        IntermediateResultPartition partition1 = result.getPartitions()[0];
        IntermediateResultPartition partition2 = result.getPartitions()[1];
        ConsumedPartitionGroup consumedPartitionGroup = (ConsumedPartitionGroup)partition1.getConsumedPartitionGroups().get(0);
        Assertions.assertThat((boolean)partition1.hasDataAllProduced()).isFalse();
        Assertions.assertThat((boolean)partition2.hasDataAllProduced()).isFalse();
        Assertions.assertThat((boolean)consumedPartitionGroup.areAllPartitionsFinished()).isFalse();
        partition1.markFinished();
        Assertions.assertThat((boolean)partition1.hasDataAllProduced()).isTrue();
        Assertions.assertThat((boolean)partition2.hasDataAllProduced()).isFalse();
        Assertions.assertThat((boolean)consumedPartitionGroup.areAllPartitionsFinished()).isFalse();
        partition2.markFinished();
        Assertions.assertThat((boolean)partition1.hasDataAllProduced()).isTrue();
        Assertions.assertThat((boolean)partition2.hasDataAllProduced()).isTrue();
        Assertions.assertThat((boolean)consumedPartitionGroup.areAllPartitionsFinished()).isTrue();
        result.resetForNewExecution();
        Assertions.assertThat((boolean)partition1.hasDataAllProduced()).isFalse();
        Assertions.assertThat((boolean)partition2.hasDataAllProduced()).isFalse();
        Assertions.assertThat((boolean)consumedPartitionGroup.areAllPartitionsFinished()).isFalse();
    }

    @Test
    void testBlockingPartitionResetting() throws Exception {
        IntermediateResult result = IntermediateResultPartitionTest.createResult(ResultPartitionType.BLOCKING, 2);
        IntermediateResultPartition partition1 = result.getPartitions()[0];
        IntermediateResultPartition partition2 = result.getPartitions()[1];
        ConsumedPartitionGroup consumedPartitionGroup = (ConsumedPartitionGroup)partition1.getConsumedPartitionGroups().get(0);
        Assertions.assertThat((boolean)partition1.hasDataAllProduced()).isFalse();
        Assertions.assertThat((boolean)partition2.hasDataAllProduced()).isFalse();
        partition1.markFinished();
        Assertions.assertThat((int)consumedPartitionGroup.getNumberOfUnfinishedPartitions()).isEqualTo(1);
        Assertions.assertThat((boolean)partition1.hasDataAllProduced()).isTrue();
        Assertions.assertThat((boolean)partition2.hasDataAllProduced()).isFalse();
        Assertions.assertThat((boolean)consumedPartitionGroup.areAllPartitionsFinished()).isFalse();
        result.resetForNewExecution();
        Assertions.assertThat((int)consumedPartitionGroup.getNumberOfUnfinishedPartitions()).isEqualTo(2);
        partition2.markFinished();
        Assertions.assertThat((int)consumedPartitionGroup.getNumberOfUnfinishedPartitions()).isEqualTo(1);
        Assertions.assertThat((boolean)partition1.hasDataAllProduced()).isFalse();
        Assertions.assertThat((boolean)partition2.hasDataAllProduced()).isTrue();
        Assertions.assertThat((boolean)consumedPartitionGroup.areAllPartitionsFinished()).isFalse();
        partition1.markFinished();
        Assertions.assertThat((int)consumedPartitionGroup.getNumberOfUnfinishedPartitions()).isEqualTo(0);
        Assertions.assertThat((boolean)partition1.hasDataAllProduced()).isTrue();
        Assertions.assertThat((boolean)partition2.hasDataAllProduced()).isTrue();
        Assertions.assertThat((boolean)consumedPartitionGroup.areAllPartitionsFinished()).isTrue();
        result.resetForNewExecution();
        Assertions.assertThat((int)consumedPartitionGroup.getNumberOfUnfinishedPartitions()).isEqualTo(2);
        Assertions.assertThat((boolean)partition1.hasDataAllProduced()).isFalse();
        Assertions.assertThat((boolean)partition2.hasDataAllProduced()).isFalse();
        Assertions.assertThat((boolean)consumedPartitionGroup.areAllPartitionsFinished()).isFalse();
    }

    @Test
    void testReleasePartitionGroups() throws Exception {
        IntermediateResult result = IntermediateResultPartitionTest.createResult(ResultPartitionType.BLOCKING, 2);
        IntermediateResultPartition partition1 = result.getPartitions()[0];
        IntermediateResultPartition partition2 = result.getPartitions()[1];
        Assertions.assertThat((boolean)partition1.canBeReleased()).isFalse();
        Assertions.assertThat((boolean)partition2.canBeReleased()).isFalse();
        List consumedPartitionGroup1 = partition1.getConsumedPartitionGroups();
        List consumedPartitionGroup2 = partition2.getConsumedPartitionGroups();
        Assertions.assertThat((List)consumedPartitionGroup1).isEqualTo((Object)consumedPartitionGroup2);
        Assertions.assertThat((List)consumedPartitionGroup1).hasSize(2);
        partition1.markPartitionGroupReleasable((ConsumedPartitionGroup)consumedPartitionGroup1.get(0));
        Assertions.assertThat((boolean)partition1.canBeReleased()).isFalse();
        partition1.markPartitionGroupReleasable((ConsumedPartitionGroup)consumedPartitionGroup1.get(1));
        Assertions.assertThat((boolean)partition1.canBeReleased()).isTrue();
        result.resetForNewExecution();
        Assertions.assertThat((boolean)partition1.canBeReleased()).isFalse();
    }

    @Test
    void testGetNumberOfSubpartitionsForNonDynamicAllToAllGraph() throws Exception {
        this.testGetNumberOfSubpartitions(7, DistributionPattern.ALL_TO_ALL, false, Arrays.asList(7, 7));
    }

    @Test
    void testGetNumberOfSubpartitionsForNonDynamicPointwiseGraph() throws Exception {
        this.testGetNumberOfSubpartitions(7, DistributionPattern.POINTWISE, false, Arrays.asList(4, 3));
    }

    @Test
    void testGetNumberOfSubpartitionsFromConsumerParallelismForDynamicAllToAllGraph() throws Exception {
        this.testGetNumberOfSubpartitions(7, DistributionPattern.ALL_TO_ALL, true, Arrays.asList(7, 7));
    }

    @Test
    void testGetNumberOfSubpartitionsFromConsumerParallelismForDynamicPointwiseGraph() throws Exception {
        this.testGetNumberOfSubpartitions(7, DistributionPattern.POINTWISE, true, Arrays.asList(4, 4));
    }

    @Test
    void testGetNumberOfSubpartitionsFromConsumerMaxParallelismForDynamicAllToAllGraph() throws Exception {
        this.testGetNumberOfSubpartitions(-1, DistributionPattern.ALL_TO_ALL, true, Arrays.asList(13, 13));
    }

    @Test
    void testGetNumberOfSubpartitionsFromConsumerMaxParallelismForDynamicPointwiseGraph() throws Exception {
        this.testGetNumberOfSubpartitions(-1, DistributionPattern.POINTWISE, true, Arrays.asList(7, 7));
    }

    private void testGetNumberOfSubpartitions(int consumerParallelism, DistributionPattern distributionPattern, boolean isDynamicGraph, List<Integer> expectedNumSubpartitions) throws Exception {
        int producerParallelism = 2;
        int consumerMaxParallelism = 13;
        ExecutionGraph eg = IntermediateResultPartitionTest.createExecutionGraph(2, consumerParallelism, 13, distributionPattern, isDynamicGraph, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        Iterator vertexIterator = eg.getVerticesTopologically().iterator();
        ExecutionJobVertex producer = (ExecutionJobVertex)vertexIterator.next();
        if (isDynamicGraph) {
            ExecutionJobVertexTest.initializeVertex(producer);
        }
        IntermediateResult result = producer.getProducedDataSets()[0];
        Assertions.assertThat(expectedNumSubpartitions).hasSize(2);
        Assertions.assertThat(Arrays.stream(result.getPartitions()).map(IntermediateResultPartition::getNumberOfSubpartitions).collect(Collectors.toList())).isEqualTo(expectedNumSubpartitions);
    }

    public static ExecutionGraph createExecutionGraph(int producerParallelism, int consumerParallelism, int consumerMaxParallelism, DistributionPattern distributionPattern, boolean isDynamicGraph, ScheduledExecutorService scheduledExecutorService) throws Exception {
        JobVertex v1 = new JobVertex("v1");
        v1.setInvokableClass(NoOpInvokable.class);
        v1.setParallelism(producerParallelism);
        JobVertex v2 = new JobVertex("v2");
        v2.setInvokableClass(NoOpInvokable.class);
        if (consumerParallelism > 0) {
            v2.setParallelism(consumerParallelism);
        }
        if (consumerMaxParallelism > 0) {
            v2.setMaxParallelism(consumerMaxParallelism);
        }
        JobVertex v3 = new JobVertex("v3");
        v3.setInvokableClass(NoOpInvokable.class);
        if (consumerParallelism > 0) {
            v3.setParallelism(consumerParallelism);
        }
        if (consumerMaxParallelism > 0) {
            v3.setMaxParallelism(consumerMaxParallelism);
        }
        IntermediateDataSetID dataSetId = new IntermediateDataSetID();
        v2.connectNewDataSetAsInput(v1, distributionPattern, ResultPartitionType.BLOCKING, dataSetId, false);
        v3.connectNewDataSetAsInput(v1, distributionPattern, ResultPartitionType.BLOCKING, dataSetId, false);
        JobGraph jobGraph = JobGraphBuilder.newBatchJobGraphBuilder().addJobVertices(Arrays.asList(v1, v2, v3)).build();
        Configuration configuration = new Configuration();
        TestingDefaultExecutionGraphBuilder builder = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).setJobMasterConfig(configuration).setVertexParallelismStore(IntermediateResultPartitionTest.computeVertexParallelismStoreConsideringDynamicGraph(jobGraph.getVertices(), isDynamicGraph, consumerMaxParallelism));
        if (isDynamicGraph) {
            return builder.buildDynamicGraph(scheduledExecutorService);
        }
        return builder.build(scheduledExecutorService);
    }

    public static VertexParallelismStore computeVertexParallelismStoreConsideringDynamicGraph(Iterable<JobVertex> vertices, boolean isDynamicGraph, int defaultMaxParallelism) {
        if (isDynamicGraph) {
            return AdaptiveBatchScheduler.computeVertexParallelismStoreForDynamicGraph(vertices, (int)defaultMaxParallelism);
        }
        return SchedulerBase.computeVertexParallelismStore(vertices);
    }

    private static IntermediateResult createResult(ResultPartitionType resultPartitionType, int parallelism) throws Exception {
        JobVertex source = new JobVertex("v1");
        source.setInvokableClass(NoOpInvokable.class);
        source.setParallelism(parallelism);
        JobVertex sink1 = new JobVertex("v2");
        sink1.setInvokableClass(NoOpInvokable.class);
        sink1.setParallelism(parallelism);
        JobVertex sink2 = new JobVertex("v3");
        sink2.setInvokableClass(NoOpInvokable.class);
        sink2.setParallelism(parallelism);
        IntermediateDataSetID dataSetId = new IntermediateDataSetID();
        sink1.connectNewDataSetAsInput(source, DistributionPattern.ALL_TO_ALL, resultPartitionType, dataSetId, false);
        sink2.connectNewDataSetAsInput(source, DistributionPattern.ALL_TO_ALL, resultPartitionType, dataSetId, false);
        DirectScheduledExecutorService executorService = new DirectScheduledExecutorService();
        JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(source, sink1, sink2);
        DefaultScheduler scheduler = new DefaultSchedulerBuilder(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), executorService).build();
        ExecutionJobVertex ejv = scheduler.getExecutionJobVertex(source.getID());
        return ejv.getProducedDataSets()[0];
    }
}

