/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecker;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.LocalInputPreferredSlotSharingStrategy;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.SlotSharingStrategy;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlotProvider;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlotRequestBulkChecker;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TernaryBoolean;
import org.junit.Assert;

public class SchedulerTestingUtils {
    private static final long DEFAULT_CHECKPOINT_TIMEOUT_MS = 600000L;
    private static final Time DEFAULT_TIMEOUT = Time.seconds((long)300L);

    private SchedulerTestingUtils() {
    }

    public static DefaultScheduler createScheduler(JobGraph jobGraph, ComponentMainThreadExecutor mainThreadExecutor, ScheduledExecutorService executorService) throws Exception {
        return new DefaultSchedulerBuilder(jobGraph, mainThreadExecutor, executorService).build();
    }

    public static void enableCheckpointing(JobGraph jobGraph) {
        SchedulerTestingUtils.enableCheckpointing(jobGraph, null, null);
    }

    public static void enableCheckpointing(JobGraph jobGraph, @Nullable StateBackend stateBackend, @Nullable CheckpointStorage checkpointStorage) {
        CheckpointCoordinatorConfiguration config = new CheckpointCoordinatorConfiguration(Long.MAX_VALUE, 600000L, 0L, 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, false, false, 0, 0L);
        SerializedValue serializedStateBackend = null;
        if (stateBackend != null) {
            try {
                serializedStateBackend = new SerializedValue((Object)stateBackend);
            }
            catch (IOException e) {
                throw new RuntimeException("could not serialize state backend", e);
            }
        }
        SerializedValue serializedCheckpointStorage = null;
        if (checkpointStorage != null) {
            try {
                serializedCheckpointStorage = new SerializedValue((Object)checkpointStorage);
            }
            catch (IOException e) {
                throw new RuntimeException("could not serialize checkpoint storage", e);
            }
        }
        jobGraph.setSnapshotSettings(new JobCheckpointingSettings(config, serializedStateBackend, TernaryBoolean.UNDEFINED, serializedCheckpointStorage, null));
    }

    public static Collection<ExecutionAttemptID> getAllCurrentExecutionAttempts(DefaultScheduler scheduler) {
        return StreamSupport.stream(scheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices().spliterator(), false).map(vertex -> vertex.getCurrentExecutionAttempt().getAttemptId()).collect(Collectors.toList());
    }

    public static ExecutionState getExecutionState(DefaultScheduler scheduler, JobVertexID jvid, int subtask) {
        ExecutionJobVertex ejv = SchedulerTestingUtils.getJobVertex(scheduler, jvid);
        return ejv.getTaskVertices()[subtask].getCurrentExecutionAttempt().getState();
    }

    public static void failExecution(DefaultScheduler scheduler, JobVertexID jvid, int subtask) {
        ExecutionAttemptID attemptID = SchedulerTestingUtils.getAttemptId(scheduler, jvid, subtask);
        scheduler.updateTaskExecutionState(new TaskExecutionState(attemptID, ExecutionState.FAILED, (Throwable)new Exception("test task failure")));
    }

    public static void canceledExecution(DefaultScheduler scheduler, JobVertexID jvid, int subtask) {
        ExecutionAttemptID attemptID = SchedulerTestingUtils.getAttemptId(scheduler, jvid, subtask);
        scheduler.updateTaskExecutionState(new TaskExecutionState(attemptID, ExecutionState.CANCELED, (Throwable)new Exception("test task failure")));
    }

    public static void setExecutionToState(ExecutionState executionState, DefaultScheduler scheduler, JobVertexID jvid, int subtask) {
        ExecutionAttemptID attemptID = SchedulerTestingUtils.getAttemptId(scheduler, jvid, subtask);
        scheduler.updateTaskExecutionState(new TaskExecutionState(attemptID, executionState));
    }

    public static void setAllExecutionsToRunning(DefaultScheduler scheduler) {
        SchedulerTestingUtils.getAllCurrentExecutionAttempts(scheduler).forEach(attemptId -> {
            scheduler.updateTaskExecutionState(new TaskExecutionState(attemptId, ExecutionState.INITIALIZING));
            scheduler.updateTaskExecutionState(new TaskExecutionState(attemptId, ExecutionState.RUNNING));
        });
    }

    public static void setAllExecutionsToCancelled(DefaultScheduler scheduler) {
        for (ExecutionAttemptID attemptId : SchedulerTestingUtils.getAllCurrentExecutionAttempts(scheduler)) {
            boolean setToRunning = scheduler.updateTaskExecutionState(new TaskExecutionState(attemptId, ExecutionState.CANCELED));
            Assert.assertTrue((String)"could not switch task to RUNNING", (boolean)setToRunning);
        }
    }

    public static void acknowledgePendingCheckpoint(DefaultScheduler scheduler, long checkpointId) throws CheckpointException {
        CheckpointCoordinator checkpointCoordinator = SchedulerTestingUtils.getCheckpointCoordinator((SchedulerBase)scheduler);
        JobID jid = scheduler.getJobId();
        for (ExecutionAttemptID attemptId : SchedulerTestingUtils.getAllCurrentExecutionAttempts(scheduler)) {
            AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(jid, attemptId, checkpointId);
            checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "Unknown location");
        }
    }

    public static CompletableFuture<CompletedCheckpoint> triggerCheckpoint(DefaultScheduler scheduler) throws Exception {
        CheckpointCoordinator checkpointCoordinator = SchedulerTestingUtils.getCheckpointCoordinator((SchedulerBase)scheduler);
        return checkpointCoordinator.triggerCheckpoint(false);
    }

    public static void acknowledgeCurrentCheckpoint(DefaultScheduler scheduler) {
        CheckpointCoordinator checkpointCoordinator = SchedulerTestingUtils.getCheckpointCoordinator((SchedulerBase)scheduler);
        Assert.assertEquals((String)"Coordinator has not ", (long)1L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        PendingCheckpoint pc = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
        while (pc.getNumberOfNonAcknowledgedOperatorCoordinators() > 0) {
            try {
                Thread.sleep(1L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                Assert.fail((String)"interrupted");
            }
        }
        SchedulerTestingUtils.getAllCurrentExecutionAttempts(scheduler).forEach(attemptId -> scheduler.acknowledgeCheckpoint(pc.getJobId(), attemptId, pc.getCheckpointId(), new CheckpointMetrics(), null));
    }

    public static CompletedCheckpoint takeCheckpoint(DefaultScheduler scheduler) throws Exception {
        CheckpointCoordinator checkpointCoordinator = SchedulerTestingUtils.getCheckpointCoordinator((SchedulerBase)scheduler);
        checkpointCoordinator.triggerCheckpoint(false);
        Assert.assertEquals((String)"test setup inconsistent", (long)1L, (long)checkpointCoordinator.getNumberOfPendingCheckpoints());
        PendingCheckpoint checkpoint = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
        CompletableFuture future = checkpoint.getCompletionFuture();
        SchedulerTestingUtils.acknowledgePendingCheckpoint(scheduler, checkpoint.getCheckpointId());
        CompletedCheckpoint completed = future.getNow(null);
        Assert.assertNotNull((String)"checkpoint not complete", (Object)completed);
        return completed;
    }

    public static CheckpointCoordinator getCheckpointCoordinator(SchedulerBase scheduler) {
        return scheduler.getCheckpointCoordinator();
    }

    private static ExecutionJobVertex getJobVertex(DefaultScheduler scheduler, JobVertexID jobVertexId) {
        ExecutionVertexID id = new ExecutionVertexID(jobVertexId, 0);
        return scheduler.getExecutionVertex(id).getJobVertex();
    }

    public static ExecutionAttemptID getAttemptId(DefaultScheduler scheduler, JobVertexID jvid, int subtask) {
        ExecutionJobVertex ejv = SchedulerTestingUtils.getJobVertex(scheduler, jvid);
        assert (ejv != null);
        return ejv.getTaskVertices()[subtask].getCurrentExecutionAttempt().getAttemptId();
    }

    public static SlotSharingExecutionSlotAllocatorFactory newSlotSharingExecutionSlotAllocatorFactory() {
        return SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(TestingPhysicalSlotProvider.createWithInfiniteSlotCreation());
    }

    public static SlotSharingExecutionSlotAllocatorFactory newSlotSharingExecutionSlotAllocatorFactory(PhysicalSlotProvider physicalSlotProvider) {
        return SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(physicalSlotProvider, DEFAULT_TIMEOUT);
    }

    public static SlotSharingExecutionSlotAllocatorFactory newSlotSharingExecutionSlotAllocatorFactory(PhysicalSlotProvider physicalSlotProvider, Time allocationTimeout) {
        return new SlotSharingExecutionSlotAllocatorFactory(physicalSlotProvider, true, (PhysicalSlotRequestBulkChecker)new TestingPhysicalSlotRequestBulkChecker(), allocationTimeout, (SlotSharingStrategy.Factory)new LocalInputPreferredSlotSharingStrategy.Factory());
    }

    public static SchedulerBase createSchedulerAndDeploy(boolean isAdaptive, JobID jobId, JobVertex producer, JobVertex[] consumers, DistributionPattern distributionPattern, BlobWriter blobWriter, ComponentMainThreadExecutor mainThreadExecutor, ScheduledExecutorService ioExecutor, JobMasterPartitionTracker partitionTracker, ScheduledExecutorService scheduledExecutor) throws Exception {
        ArrayList<JobVertex> vertices = new ArrayList<JobVertex>(Collections.singletonList(producer));
        IntermediateDataSetID dataSetId = new IntermediateDataSetID();
        for (JobVertex consumer : consumers) {
            consumer.connectNewDataSetAsInput(producer, distributionPattern, ResultPartitionType.BLOCKING, dataSetId, false);
            vertices.add(consumer);
        }
        DefaultScheduler scheduler = SchedulerTestingUtils.createScheduler(isAdaptive, jobId, vertices, blobWriter, mainThreadExecutor, ioExecutor, partitionTracker, scheduledExecutor);
        ExecutionGraph executionGraph = scheduler.getExecutionGraph();
        TestingLogicalSlotBuilder slotBuilder = new TestingLogicalSlotBuilder();
        CompletableFuture.runAsync(() -> {
            try {
                if (isAdaptive) {
                    SchedulerTestingUtils.initializeExecutionJobVertex(producer.getID(), executionGraph);
                }
                SchedulerTestingUtils.deployTasks(executionGraph, producer.getID(), slotBuilder);
                ExecutionGraphTestUtils.finishJobVertex(executionGraph, producer.getID());
                for (JobVertex consumer : consumers) {
                    if (isAdaptive) {
                        SchedulerTestingUtils.initializeExecutionJobVertex(consumer.getID(), executionGraph);
                    }
                    SchedulerTestingUtils.deployTasks(executionGraph, consumer.getID(), slotBuilder);
                }
            }
            catch (Exception e) {
                throw new RuntimeException("Exceptions shouldn't happen here.", e);
            }
        }, (Executor)mainThreadExecutor).join();
        return scheduler;
    }

    private static void initializeExecutionJobVertex(JobVertexID jobVertex, ExecutionGraph executionGraph) {
        try {
            executionGraph.initializeJobVertex(executionGraph.getJobVertex(jobVertex), System.currentTimeMillis());
            executionGraph.notifyNewlyInitializedJobVertices(Collections.singletonList(executionGraph.getJobVertex(jobVertex)));
        }
        catch (JobException exception) {
            throw new RuntimeException(exception);
        }
    }

    private static DefaultScheduler createScheduler(boolean isAdaptive, JobID jobId, List<JobVertex> jobVertices, BlobWriter blobWriter, ComponentMainThreadExecutor mainThreadExecutor, ScheduledExecutorService ioExecutor, JobMasterPartitionTracker partitionTracker, ScheduledExecutorService scheduledExecutor) throws Exception {
        JobGraph jobGraph = JobGraphBuilder.newBatchJobGraphBuilder().setJobId(jobId).addJobVertices(jobVertices).build();
        DefaultSchedulerBuilder builder = new DefaultSchedulerBuilder(jobGraph, mainThreadExecutor, scheduledExecutor).setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, 0L)).setBlobWriter(blobWriter).setIoExecutor(ioExecutor).setPartitionTracker(partitionTracker);
        return isAdaptive ? builder.buildAdaptiveBatchJobScheduler() : builder.build();
    }

    private static void deployTasks(ExecutionGraph executionGraph, JobVertexID jobVertexID, TestingLogicalSlotBuilder slotBuilder) throws JobException, ExecutionException, InterruptedException {
        for (ExecutionVertex vertex : Objects.requireNonNull(executionGraph.getJobVertex(jobVertexID)).getTaskVertices()) {
            TestingLogicalSlot slot = slotBuilder.createTestingLogicalSlot();
            Execution execution = vertex.getCurrentExecutionAttempt();
            execution.registerProducedPartitions(slot.getTaskManagerLocation()).get();
            execution.transitionState(ExecutionState.SCHEDULED);
            vertex.tryAssignResource((LogicalSlot)slot);
            vertex.deploy();
        }
    }
}

