/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptivebatch;

import java.util.Collections;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmaster.event.ExecutionJobVertexFinishedEvent;
import org.apache.flink.runtime.jobmaster.event.JobEvent;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.adaptivebatch.DefaultAdaptiveExecutionHandler;
import org.apache.flink.runtime.scheduler.adaptivebatch.ExecutionPlanSchedulingContext;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.DynamicCodeLoadingException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class AdaptiveExecutionPlanSchedulingContextTest {
    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    AdaptiveExecutionPlanSchedulingContextTest() {
    }

    @Test
    void testGetParallelismAndMaxParallelism() throws DynamicCodeLoadingException {
        int sinkParallelism = 4;
        int sinkMaxParallelism = 5;
        DefaultAdaptiveExecutionHandler adaptiveExecutionHandler = AdaptiveExecutionPlanSchedulingContextTest.getDefaultAdaptiveExecutionHandler(sinkParallelism, sinkMaxParallelism);
        ExecutionPlanSchedulingContext schedulingContext = adaptiveExecutionHandler.createExecutionPlanSchedulingContext(100);
        JobGraph jobGraph = adaptiveExecutionHandler.getJobGraph();
        JobVertex sourceVertex = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
        IntermediateDataSet sourceDataSet = (IntermediateDataSet)sourceVertex.getProducedDataSets().get(0);
        Assertions.assertThat((int)schedulingContext.getConsumersParallelism(id -> 123, sourceDataSet)).isEqualTo(sinkParallelism);
        Assertions.assertThat((int)schedulingContext.getConsumersMaxParallelism(id -> 456, sourceDataSet)).isEqualTo(sinkMaxParallelism);
        adaptiveExecutionHandler.handleJobEvent((JobEvent)new ExecutionJobVertexFinishedEvent(sourceVertex.getID(), Collections.emptyMap()));
        Assertions.assertThat((int)schedulingContext.getConsumersParallelism(id -> 123, sourceDataSet)).isEqualTo(123);
        Assertions.assertThat((int)schedulingContext.getConsumersMaxParallelism(id -> 456, sourceDataSet)).isEqualTo(456);
    }

    @Test
    void testGetDefaultMaxParallelismWhenParallelismGreaterThanZero() throws DynamicCodeLoadingException {
        int sinkParallelism = 4;
        int sinkMaxParallelism = -1;
        int defaultMaxParallelism = 100;
        DefaultAdaptiveExecutionHandler adaptiveExecutionHandler = AdaptiveExecutionPlanSchedulingContextTest.getDefaultAdaptiveExecutionHandler(sinkParallelism, sinkMaxParallelism);
        ExecutionPlanSchedulingContext schedulingContext = adaptiveExecutionHandler.createExecutionPlanSchedulingContext(defaultMaxParallelism);
        JobGraph jobGraph = adaptiveExecutionHandler.getJobGraph();
        JobVertex sourceVertex = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
        IntermediateDataSet dataSet = (IntermediateDataSet)sourceVertex.getProducedDataSets().get(0);
        Assertions.assertThat((int)schedulingContext.getConsumersMaxParallelism(id -> 123, dataSet)).isEqualTo(SchedulerBase.getDefaultMaxParallelism((int)sinkParallelism));
    }

    @Test
    void testGetDefaultMaxParallelismWhenParallelismLessThanZero() throws DynamicCodeLoadingException {
        int sinkParallelism = -1;
        int sinkMaxParallelism = -1;
        int defaultMaxParallelism = 100;
        DefaultAdaptiveExecutionHandler adaptiveExecutionHandler = AdaptiveExecutionPlanSchedulingContextTest.getDefaultAdaptiveExecutionHandler(sinkParallelism, sinkMaxParallelism);
        ExecutionPlanSchedulingContext schedulingContext = adaptiveExecutionHandler.createExecutionPlanSchedulingContext(defaultMaxParallelism);
        JobGraph jobGraph = adaptiveExecutionHandler.getJobGraph();
        JobVertex sourceVertex = (JobVertex)jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
        IntermediateDataSet dataSet = (IntermediateDataSet)sourceVertex.getProducedDataSets().get(0);
        Assertions.assertThat((int)schedulingContext.getConsumersMaxParallelism(id -> 123, dataSet)).isEqualTo(defaultMaxParallelism);
    }

    @Test
    public void testGetPendingOperatorCount() throws DynamicCodeLoadingException {
        DefaultAdaptiveExecutionHandler adaptiveExecutionHandler = AdaptiveExecutionPlanSchedulingContextTest.getDefaultAdaptiveExecutionHandler();
        ExecutionPlanSchedulingContext schedulingContext = adaptiveExecutionHandler.createExecutionPlanSchedulingContext(1);
        Assertions.assertThat((int)schedulingContext.getPendingOperatorCount()).isEqualTo(1);
        JobGraph jobGraph = adaptiveExecutionHandler.getJobGraph();
        JobVertex source = (JobVertex)jobGraph.getVertices().iterator().next();
        adaptiveExecutionHandler.handleJobEvent((JobEvent)new ExecutionJobVertexFinishedEvent(source.getID(), Collections.emptyMap()));
        Assertions.assertThat((int)schedulingContext.getPendingOperatorCount()).isEqualTo(0);
    }

    private static DefaultAdaptiveExecutionHandler getDefaultAdaptiveExecutionHandler() throws DynamicCodeLoadingException {
        return AdaptiveExecutionPlanSchedulingContextTest.getDefaultAdaptiveExecutionHandler(2, 2);
    }

    private static DefaultAdaptiveExecutionHandler getDefaultAdaptiveExecutionHandler(int sinkParallelism, int sinkMaxParallelism) throws DynamicCodeLoadingException {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromSequence(0L, 1L).disableChaining().print();
        StreamGraph streamGraph = env.getStreamGraph();
        for (StreamNode streamNode : streamGraph.getStreamNodes()) {
            if (!streamNode.getOperatorName().contains("Sink")) continue;
            streamNode.setParallelism(Integer.valueOf(sinkParallelism));
            if (sinkMaxParallelism <= 0) continue;
            streamNode.setMaxParallelism(sinkMaxParallelism);
        }
        return new DefaultAdaptiveExecutionHandler(Thread.currentThread().getContextClassLoader(), streamGraph, (Executor)EXECUTOR_RESOURCE.getExecutor());
    }
}

