/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testtasks.OnceBlockingNoOpInvokable;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;

public class AdaptiveSchedulerClusterITCase
extends TestLogger {
    private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 2;
    private static final int NUMBER_TASK_MANAGERS = 2;
    private static final int PARALLELISM = 4;
    private final Configuration configuration = this.createConfiguration();
    @Rule
    public final MiniClusterResource miniClusterResource = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(this.configuration).setNumberSlotsPerTaskManager(2).setNumberTaskManagers(2).build());

    private Configuration createConfiguration() {
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.SCHEDULER, (Object)JobManagerOptions.SchedulerType.Adaptive);
        configuration.set(ClusterOptions.ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT, (Object)true);
        configuration.set(JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT, (Object)Duration.ofMillis(100L));
        return configuration;
    }

    @Test
    public void testAutomaticScaleDownInCaseOfLostSlots() throws InterruptedException, IOException {
        Assume.assumeTrue((boolean)ClusterOptions.isDeclarativeResourceManagementEnabled((Configuration)this.configuration));
        MiniCluster miniCluster = this.miniClusterResource.getMiniCluster();
        JobGraph jobGraph = this.createBlockingJobGraph(4);
        miniCluster.submitJob(jobGraph).join();
        CompletableFuture resultFuture = miniCluster.requestJobResult(jobGraph.getJobID());
        OnceBlockingNoOpInvokable.waitUntilOpsAreRunning();
        miniCluster.terminateTaskManager(0);
        JobResult jobResult = (JobResult)resultFuture.join();
        Assert.assertTrue((boolean)jobResult.isSuccess());
    }

    @Test
    public void testAutomaticScaleUp() throws Exception {
        Assume.assumeTrue((boolean)ClusterOptions.isDeclarativeResourceManagementEnabled((Configuration)this.configuration));
        MiniCluster miniCluster = this.miniClusterResource.getMiniCluster();
        int targetInstanceCount = 6;
        JobGraph jobGraph = this.createBlockingJobGraph(targetInstanceCount);
        OnceBlockingNoOpInvokable.resetFor(4);
        this.log.info("Submitting job with parallelism of " + targetInstanceCount + ", to a cluster with only one TM.");
        miniCluster.submitJob(jobGraph).join();
        CompletableFuture jobResultFuture = miniCluster.requestJobResult(jobGraph.getJobID());
        OnceBlockingNoOpInvokable.waitUntilOpsAreRunning();
        this.log.info("Start additional TaskManager to scale up to the full parallelism.");
        OnceBlockingNoOpInvokable.resetInstanceCount();
        OnceBlockingNoOpInvokable.resetFor(targetInstanceCount);
        miniCluster.startTaskManager();
        this.log.info("Waiting until Invokable is running with higher parallelism");
        OnceBlockingNoOpInvokable.waitUntilOpsAreRunning();
        Assert.assertEquals((long)targetInstanceCount, (long)OnceBlockingNoOpInvokable.getInstanceCount());
        Assert.assertTrue((boolean)((JobResult)jobResultFuture.join()).isSuccess());
    }

    private JobGraph createBlockingJobGraph(int parallelism) throws IOException {
        JobVertex blockingOperator = new JobVertex("Blocking operator");
        OnceBlockingNoOpInvokable.resetFor(parallelism);
        blockingOperator.setInvokableClass(OnceBlockingNoOpInvokable.class);
        blockingOperator.setParallelism(parallelism);
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(blockingOperator);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
        jobGraph.setExecutionConfig(executionConfig);
        return jobGraph;
    }
}

