/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.DefaultSubtaskAttemptNumberStore;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor;
import org.apache.flink.runtime.executiongraph.SubtaskAttemptNumberStore;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
import org.apache.flink.runtime.operators.coordination.CoordinatorStoreImpl;
import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class ExecutionJobVertexTest {
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    ExecutionJobVertexTest() {
    }

    @Test
    void testParallelismGreaterThanMaxParallelism() {
        JobVertex jobVertex = new JobVertex("testVertex");
        jobVertex.setInvokableClass(AbstractInvokable.class);
        jobVertex.setParallelism(172);
        jobVertex.setMaxParallelism(4);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ExecutionGraphTestUtils.getExecutionJobVertex(jobVertex)).isInstanceOf(JobException.class)).hasMessageContaining("higher than the max parallelism");
    }

    @Test
    void testLazyInitialization() throws Exception {
        int parallelism = 3;
        int configuredMaxParallelism = 12;
        ExecutionJobVertex ejv = ExecutionJobVertexTest.createDynamicExecutionJobVertex(3, 12, -1);
        Assertions.assertThat((int)ejv.getParallelism()).isEqualTo(3);
        Assertions.assertThat((int)ejv.getMaxParallelism()).isEqualTo(12);
        Assertions.assertThat((boolean)ejv.isInitialized()).isFalse();
        Assertions.assertThat((Object[])ejv.getTaskVertices()).isEmpty();
        Assertions.assertThatThrownBy(() -> ((ExecutionJobVertex)ejv).getInputs()).isInstanceOf(IllegalStateException.class);
        Assertions.assertThatThrownBy(() -> ((ExecutionJobVertex)ejv).getProducedDataSets()).isInstanceOf(IllegalStateException.class);
        Assertions.assertThatThrownBy(() -> ((ExecutionJobVertex)ejv).getSplitAssigner()).isInstanceOf(IllegalStateException.class);
        Assertions.assertThatThrownBy(() -> ((ExecutionJobVertex)ejv).getOperatorCoordinators()).isInstanceOf(IllegalStateException.class);
        Assertions.assertThatThrownBy(() -> ejv.connectToPredecessors(Collections.emptyMap())).isInstanceOf(IllegalStateException.class);
        Assertions.assertThatThrownBy(() -> ((ExecutionJobVertex)ejv).executionVertexFinished()).isInstanceOf(IllegalStateException.class);
        Assertions.assertThatThrownBy(() -> ((ExecutionJobVertex)ejv).executionVertexUnFinished()).isInstanceOf(IllegalStateException.class);
        ExecutionJobVertexTest.initializeVertex(ejv);
        Assertions.assertThat((boolean)ejv.isInitialized()).isTrue();
        Assertions.assertThat((Object[])ejv.getTaskVertices()).hasSize(3);
        Assertions.assertThat((List)ejv.getInputs()).isEmpty();
        Assertions.assertThat((Object[])ejv.getProducedDataSets()).hasSize(1);
        Assertions.assertThat((Collection)ejv.getOperatorCoordinators()).isEmpty();
    }

    @Test
    void testErrorIfInitializationWithoutParallelismDecided() throws Exception {
        ExecutionJobVertex ejv = ExecutionJobVertexTest.createDynamicExecutionJobVertex();
        Assertions.assertThatThrownBy(() -> ExecutionJobVertexTest.initializeVertex(ejv)).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testSetParallelismLazily() throws Exception {
        int parallelism = 3;
        int defaultMaxParallelism = 13;
        ExecutionJobVertex ejv = ExecutionJobVertexTest.createDynamicExecutionJobVertex(-1, -1, 13);
        Assertions.assertThat((boolean)ejv.isParallelismDecided()).isFalse();
        ejv.setParallelism(3);
        Assertions.assertThat((boolean)ejv.isParallelismDecided()).isTrue();
        Assertions.assertThat((int)ejv.getParallelism()).isEqualTo(3);
        ExecutionJobVertexTest.initializeVertex(ejv);
        Assertions.assertThat((Object[])ejv.getTaskVertices()).hasSize(3);
    }

    @Test
    void testConfiguredMaxParallelismIsRespected() throws Exception {
        int configuredMaxParallelism = 12;
        int defaultMaxParallelism = 13;
        ExecutionJobVertex ejv = ExecutionJobVertexTest.createDynamicExecutionJobVertex(-1, 12, 13);
        Assertions.assertThat((int)ejv.getMaxParallelism()).isEqualTo(12);
    }

    @Test
    void testComputingMaxParallelismFromConfiguredParallelism() throws Exception {
        int parallelism = 300;
        int defaultMaxParallelism = 13;
        ExecutionJobVertex ejv = ExecutionJobVertexTest.createDynamicExecutionJobVertex(300, -1, 13);
        Assertions.assertThat((int)ejv.getMaxParallelism()).isEqualTo(512);
    }

    @Test
    void testFallingBackToDefaultMaxParallelism() throws Exception {
        int defaultMaxParallelism = 13;
        ExecutionJobVertex ejv = ExecutionJobVertexTest.createDynamicExecutionJobVertex(-1, -1, 13);
        Assertions.assertThat((int)ejv.getMaxParallelism()).isEqualTo(13);
    }

    static void initializeVertex(ExecutionJobVertex vertex) throws Exception {
        vertex.initialize(1, Time.milliseconds((long)1L), 1L, (SubtaskAttemptNumberStore)new DefaultSubtaskAttemptNumberStore(Collections.emptyList()), (CoordinatorStore)new CoordinatorStoreImpl(), UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
    }

    private static ExecutionJobVertex createDynamicExecutionJobVertex() throws Exception {
        return ExecutionJobVertexTest.createDynamicExecutionJobVertex(-1, -1, 1);
    }

    public static ExecutionJobVertex createDynamicExecutionJobVertex(int parallelism, int maxParallelism, int defaultMaxParallelism) throws Exception {
        JobVertex jobVertex = new JobVertex("testVertex");
        jobVertex.setInvokableClass(AbstractInvokable.class);
        jobVertex.getOrCreateResultDataSet(new IntermediateDataSetID(), ResultPartitionType.BLOCKING);
        if (maxParallelism > 0) {
            jobVertex.setMaxParallelism(maxParallelism);
        }
        if (parallelism > 0) {
            jobVertex.setParallelism(parallelism);
        }
        DefaultExecutionGraph eg = TestingDefaultExecutionGraphBuilder.newBuilder().build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        VertexParallelismStore vertexParallelismStore = AdaptiveBatchScheduler.computeVertexParallelismStoreForDynamicGraph(Collections.singletonList(jobVertex), (int)defaultMaxParallelism);
        VertexParallelismInformation vertexParallelismInfo = vertexParallelismStore.getParallelismInfo(jobVertex.getID());
        return new ExecutionJobVertex((InternalExecutionGraphAccessor)eg, jobVertex, vertexParallelismInfo);
    }
}

