/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.benchmark;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.benchmark.JobConfiguration;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;

public class SchedulerBenchmarkUtils {
    public static List<JobVertex> createDefaultJobVertices(JobConfiguration jobConfiguration) {
        ArrayList<JobVertex> jobVertices = new ArrayList<JobVertex>();
        JobVertex source = new JobVertex("source");
        source.setInvokableClass(NoOpInvokable.class);
        source.setParallelism(jobConfiguration.getParallelism());
        jobVertices.add(source);
        JobVertex sink = new JobVertex("sink");
        sink.setInvokableClass(NoOpInvokable.class);
        sink.setParallelism(jobConfiguration.getParallelism());
        jobVertices.add(sink);
        sink.connectNewDataSetAsInput(source, jobConfiguration.getDistributionPattern(), jobConfiguration.getResultPartitionType());
        return jobVertices;
    }

    public static JobGraph createJobGraph(JobConfiguration jobConfiguration) throws IOException {
        return SchedulerBenchmarkUtils.createJobGraph(Collections.emptyList(), jobConfiguration);
    }

    public static JobGraph createJobGraph(List<JobVertex> jobVertices, JobConfiguration jobConfiguration) throws IOException {
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertices.toArray(new JobVertex[0]));
        jobGraph.setJobType(jobConfiguration.getJobType());
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setExecutionMode(jobConfiguration.getExecutionMode());
        jobGraph.setExecutionConfig(executionConfig);
        return jobGraph;
    }

    public static ExecutionGraph createAndInitExecutionGraph(List<JobVertex> jobVertices, JobConfiguration jobConfiguration, ScheduledExecutorService scheduledExecutorService) throws Exception {
        JobGraph jobGraph = SchedulerBenchmarkUtils.createJobGraph(jobVertices, jobConfiguration);
        ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
        DefaultScheduler scheduler = new DefaultSchedulerBuilder(jobGraph, mainThreadExecutor, scheduledExecutorService).setIoExecutor(scheduledExecutorService).setFutureExecutor(scheduledExecutorService).setDelayExecutor((ScheduledExecutor)new ScheduledExecutorServiceAdapter(scheduledExecutorService)).build();
        return scheduler.getExecutionGraph();
    }

    public static void deployTasks(ExecutionGraph executionGraph, JobVertexID jobVertexID, TestingLogicalSlotBuilder slotBuilder) throws JobException, ExecutionException, InterruptedException {
        for (ExecutionVertex vertex : executionGraph.getJobVertex(jobVertexID).getTaskVertices()) {
            TestingLogicalSlot slot = slotBuilder.createTestingLogicalSlot();
            Execution execution = vertex.getCurrentExecutionAttempt();
            execution.transitionState(ExecutionState.SCHEDULED);
            execution.registerProducedPartitions(slot.getTaskManagerLocation()).get();
            SchedulerBenchmarkUtils.assignResourceAndDeploy(vertex, slot);
        }
    }

    public static void deployAllTasks(ExecutionGraph executionGraph, TestingLogicalSlotBuilder slotBuilder) throws JobException, ExecutionException, InterruptedException {
        for (ExecutionVertex vertex : executionGraph.getAllExecutionVertices()) {
            TestingLogicalSlot slot = slotBuilder.createTestingLogicalSlot();
            Execution execution = vertex.getCurrentExecutionAttempt();
            execution.transitionState(ExecutionState.SCHEDULED);
            execution.registerProducedPartitions(slot.getTaskManagerLocation()).get();
            SchedulerBenchmarkUtils.assignResourceAndDeploy(vertex, slot);
        }
    }

    private static void assignResourceAndDeploy(ExecutionVertex vertex, LogicalSlot slot) throws JobException {
        vertex.tryAssignResource(slot);
        vertex.deploy();
    }

    public static void transitionTaskStatus(ExecutionGraph executionGraph, JobVertexID jobVertexID, ExecutionState state) {
        for (ExecutionVertex vertex : executionGraph.getJobVertex(jobVertexID).getTaskVertices()) {
            executionGraph.updateState(new TaskExecutionStateTransition(new TaskExecutionState(vertex.getCurrentExecutionAttempt().getAttemptId(), state)));
        }
    }

    public static void transitionTaskStatus(DefaultScheduler scheduler, AccessExecutionJobVertex vertex, int subtask, ExecutionState executionState) {
        ExecutionAttemptID attemptId = vertex.getTaskVertices()[subtask].getCurrentExecutionAttempt().getAttemptId();
        scheduler.updateTaskExecutionState(new TaskExecutionState(attemptId, executionState));
    }
}

