/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive;

import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testtasks.OnceBlockingNoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.SupplierWithException;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

public class AdaptiveSchedulerClusterITCase
extends TestLogger {
    private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 2;
    private static final int NUMBER_TASK_MANAGERS = 2;
    private static final int PARALLELISM = 4;
    private static final JobVertexID JOB_VERTEX_ID = new JobVertexID();
    private final Configuration configuration = this.createConfiguration();
    @Rule
    public final MiniClusterResource miniClusterResource = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(this.configuration).setNumberSlotsPerTaskManager(2).setNumberTaskManagers(2).build());

    private Configuration createConfiguration() {
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.SCHEDULER, (Object)JobManagerOptions.SchedulerType.Adaptive);
        configuration.set(JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT, (Object)Duration.ofMillis(1L));
        configuration.set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN, (Object)Duration.ofMillis(1L));
        configuration.set(WebOptions.CHECKPOINTS_HISTORY_SIZE, (Object)Integer.MAX_VALUE);
        return configuration;
    }

    @Before
    public void setUp() {
        OnceBlockingNoOpInvokable.reset();
    }

    @Test
    public void testAutomaticScaleDownInCaseOfLostSlots() throws Exception {
        MiniCluster miniCluster = this.miniClusterResource.getMiniCluster();
        JobGraph jobGraph = this.createBlockingJobGraph(4);
        miniCluster.submitJob(jobGraph).join();
        CompletableFuture resultFuture = miniCluster.requestJobResult(jobGraph.getJobID());
        this.waitUntilParallelismForVertexReached(jobGraph.getJobID(), JOB_VERTEX_ID, 4);
        miniCluster.terminateTaskManager(0);
        this.waitUntilParallelismForVertexReached(jobGraph.getJobID(), JOB_VERTEX_ID, 2);
        OnceBlockingNoOpInvokable.unblock();
        JobResult jobResult = (JobResult)resultFuture.join();
        Assert.assertTrue((boolean)jobResult.isSuccess());
    }

    @Test
    public void testAutomaticScaleUp() throws Exception {
        MiniCluster miniCluster = this.miniClusterResource.getMiniCluster();
        int initialInstanceCount = 4;
        int targetInstanceCount = initialInstanceCount + 2;
        JobGraph jobGraph = this.createBlockingJobGraph(targetInstanceCount);
        this.log.info("Submitting job with parallelism of " + targetInstanceCount + ", to a cluster with only one TM.");
        miniCluster.submitJob(jobGraph).join();
        CompletableFuture jobResultFuture = miniCluster.requestJobResult(jobGraph.getJobID());
        this.waitUntilParallelismForVertexReached(jobGraph.getJobID(), JOB_VERTEX_ID, initialInstanceCount);
        this.log.info("Start additional TaskManager to scale up to the full parallelism.");
        miniCluster.startTaskManager();
        this.log.info("Waiting until Invokable is running with higher parallelism");
        this.waitUntilParallelismForVertexReached(jobGraph.getJobID(), JOB_VERTEX_ID, targetInstanceCount);
        OnceBlockingNoOpInvokable.unblock();
        Assert.assertTrue((boolean)((JobResult)jobResultFuture.join()).isSuccess());
    }

    @Test
    public void testCheckpointStatsPersistedAcrossRescale() throws Exception {
        MiniCluster miniCluster = this.miniClusterResource.getMiniCluster();
        JobVertex jobVertex = new JobVertex("jobVertex", JOB_VERTEX_ID);
        jobVertex.setInvokableClass(CheckpointingNoOpInvokable.class);
        jobVertex.setParallelism(4);
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex);
        jobGraph.setSnapshotSettings(new JobCheckpointingSettings(CheckpointCoordinatorConfiguration.builder().setCheckpointInterval(100L).setCheckpointTimeout(1000L).build(), null));
        miniCluster.submitJob(jobGraph).join();
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> (Boolean)((CompletableFuture)miniCluster.getExecutionGraph(jobGraph.getJobID()).thenApply(eg -> eg.getCheckpointStatsSnapshot().getCounts().getNumberOfCompletedCheckpoints() > 0L)).get()));
        miniCluster.terminateTaskManager(0);
        this.waitUntilParallelismForVertexReached(jobGraph.getJobID(), JOB_VERTEX_ID, 2);
        List checkpointHistory = (List)((CompletableFuture)miniCluster.getExecutionGraph(jobGraph.getJobID()).thenApply(eg -> eg.getCheckpointStatsSnapshot().getHistory().getCheckpoints())).get();
        MatcherAssert.assertThat((Object)((AbstractCheckpointStats)checkpointHistory.get(checkpointHistory.size() - 1)).getCheckpointId(), (Matcher)Is.is((Object)1L));
    }

    private JobGraph createBlockingJobGraph(int parallelism) throws IOException {
        JobVertex blockingOperator = new JobVertex("Blocking operator", JOB_VERTEX_ID);
        blockingOperator.setInvokableClass(OnceBlockingNoOpInvokable.class);
        blockingOperator.setParallelism(parallelism);
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(blockingOperator);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
        jobGraph.setExecutionConfig(executionConfig);
        return jobGraph;
    }

    private void waitUntilParallelismForVertexReached(JobID jobId, JobVertexID jobVertexId, int targetParallelism) throws Exception {
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> {
            ArchivedExecutionGraph archivedExecutionGraph = (ArchivedExecutionGraph)this.miniClusterResource.getMiniCluster().getArchivedExecutionGraph(jobId).get();
            AccessExecutionJobVertex executionJobVertex = (AccessExecutionJobVertex)archivedExecutionGraph.getAllVertices().get(jobVertexId);
            if (executionJobVertex == null) {
                return false;
            }
            return executionJobVertex.getParallelism() == targetParallelism;
        }));
    }

    public static class CheckpointingNoOpInvokable
    extends AbstractInvokable {
        private static final long CANCEL_SIGNAL = -2L;
        private final BlockingQueue<Long> checkpointsToConfirm = new ArrayBlockingQueue<Long>(1);

        public CheckpointingNoOpInvokable(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            long signal = this.checkpointsToConfirm.take();
            while (signal != -2L) {
                this.getEnvironment().acknowledgeCheckpoint(signal, new CheckpointMetrics());
                signal = this.checkpointsToConfirm.take();
            }
        }

        public void cancel() throws Exception {
            this.checkpointsToConfirm.add(-2L);
        }

        public CompletableFuture<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
            this.checkpointsToConfirm.add(checkpointMetaData.getCheckpointId());
            return CompletableFuture.completedFuture(true);
        }

        public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {
            return CompletableFuture.completedFuture(null);
        }
    }
}

