/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptivebatch;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SlowTaskDetectorOptions;
import org.apache.flink.runtime.blocklist.BlockedNode;
import org.apache.flink.runtime.blocklist.BlocklistOperations;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.DefaultExecutionOperations;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.DefaultSchedulerTest;
import org.apache.flink.runtime.scheduler.ExecutionOperations;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.TestExecutionOperationsDecorator;
import org.apache.flink.runtime.scheduler.TestExecutionSlotAllocator;
import org.apache.flink.runtime.scheduler.TestExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerTest;
import org.apache.flink.runtime.scheduler.adaptivebatch.SpeculativeScheduler;
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

class SpeculativeSchedulerTest {
    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    private ScheduledExecutorService futureExecutor;
    private ManuallyTriggeredScheduledExecutor taskRestartExecutor;
    private TestExecutionOperationsDecorator testExecutionOperations;
    private TestBlocklistOperations testBlocklistOperations;
    private TestRestartBackoffTimeStrategy restartStrategy;
    private TestExecutionSlotAllocatorFactory testExecutionSlotAllocatorFactory;
    private TestExecutionSlotAllocator testExecutionSlotAllocator;

    SpeculativeSchedulerTest() {
    }

    @BeforeEach
    void setUp() {
        this.futureExecutor = new DirectScheduledExecutorService();
        this.taskRestartExecutor = new ManuallyTriggeredScheduledExecutor();
        this.testExecutionOperations = new TestExecutionOperationsDecorator((ExecutionOperations)new DefaultExecutionOperations());
        this.testBlocklistOperations = new TestBlocklistOperations();
        this.restartStrategy = new TestRestartBackoffTimeStrategy(true, 0L);
        this.testExecutionSlotAllocatorFactory = new TestExecutionSlotAllocatorFactory();
        this.testExecutionSlotAllocator = this.testExecutionSlotAllocatorFactory.getTestExecutionSlotAllocator();
    }

    @AfterEach
    void tearDown() {
        if (this.futureExecutor != null) {
            ExecutorUtils.gracefulShutdown((long)10L, (TimeUnit)TimeUnit.SECONDS, (ExecutorService[])new ExecutorService[]{this.futureExecutor});
        }
    }

    @Test
    void testStartScheduling() {
        this.createSchedulerAndStartScheduling();
        List<ExecutionAttemptID> deployedExecutions = this.testExecutionOperations.getDeployedExecutions();
        Assertions.assertThat(deployedExecutions).hasSize(1);
    }

    @Test
    void testNotifySlowTasks() {
        SpeculativeScheduler scheduler = this.createSchedulerAndStartScheduling();
        ExecutionVertex ev = SpeculativeSchedulerTest.getOnlyExecutionVertex(scheduler);
        Execution attempt1 = ev.getCurrentExecutionAttempt();
        Assertions.assertThat(this.testExecutionOperations.getDeployedExecutions()).hasSize(1);
        long timestamp = System.currentTimeMillis();
        SpeculativeSchedulerTest.notifySlowTask(scheduler, attempt1);
        Assertions.assertThat(this.testExecutionOperations.getDeployedExecutions()).hasSize(2);
        Assertions.assertThat(this.testBlocklistOperations.getAllBlockedNodeIds()).containsExactly((Object[])new String[]{attempt1.getAssignedResourceLocation().getNodeId()});
        Execution attempt2 = SpeculativeSchedulerTest.getExecution(ev, 1);
        Assertions.assertThat((Comparable)attempt2.getState()).isEqualTo((Object)ExecutionState.DEPLOYING);
        Assertions.assertThat((long)attempt2.getStateTimestamp(ExecutionState.CREATED)).isGreaterThanOrEqualTo(timestamp);
    }

    @Test
    void testNotifyDuplicatedSlowTasks() {
        SpeculativeScheduler scheduler = this.createSchedulerAndStartScheduling();
        ExecutionVertex ev = SpeculativeSchedulerTest.getOnlyExecutionVertex(scheduler);
        Execution attempt1 = ev.getCurrentExecutionAttempt();
        SpeculativeSchedulerTest.notifySlowTask(scheduler, attempt1);
        Assertions.assertThat(this.testExecutionOperations.getDeployedExecutions()).hasSize(2);
        SpeculativeSchedulerTest.notifySlowTask(scheduler, attempt1);
        Assertions.assertThat(this.testExecutionOperations.getDeployedExecutions()).hasSize(2);
        Execution attempt2 = SpeculativeSchedulerTest.getExecution(ev, 1);
        scheduler.updateTaskExecutionState(SchedulerTestingUtils.createFailedTaskExecutionState(attempt2.getAttemptId()));
        SpeculativeSchedulerTest.notifySlowTask(scheduler, attempt1);
        Assertions.assertThat(this.testExecutionOperations.getDeployedExecutions()).hasSize(3);
    }

    @Test
    void testRestartVertexIfAllSpeculativeExecutionFailed() {
        SpeculativeScheduler scheduler = this.createSchedulerAndStartScheduling();
        ExecutionVertex ev = SpeculativeSchedulerTest.getOnlyExecutionVertex(scheduler);
        Execution attempt1 = ev.getCurrentExecutionAttempt();
        SpeculativeSchedulerTest.notifySlowTask(scheduler, attempt1);
        Assertions.assertThat(this.testExecutionOperations.getDeployedExecutions()).hasSize(2);
        ExecutionAttemptID attemptId1 = attempt1.getAttemptId();
        ExecutionAttemptID attemptId2 = SpeculativeSchedulerTest.getExecution(ev, 1).getAttemptId();
        scheduler.updateTaskExecutionState(SchedulerTestingUtils.createFailedTaskExecutionState(attemptId1));
        scheduler.updateTaskExecutionState(SchedulerTestingUtils.createFailedTaskExecutionState(attemptId2));
        this.taskRestartExecutor.triggerScheduledTasks();
        Assertions.assertThat(this.testExecutionOperations.getDeployedExecutions()).hasSize(3);
    }

    @Test
    void testNoRestartIfNotAllSpeculativeExecutionFailed() {
        SpeculativeScheduler scheduler = this.createSchedulerAndStartScheduling();
        ExecutionVertex ev = SpeculativeSchedulerTest.getOnlyExecutionVertex(scheduler);
        Execution attempt1 = ev.getCurrentExecutionAttempt();
        SpeculativeSchedulerTest.notifySlowTask(scheduler, attempt1);
        scheduler.updateTaskExecutionState(SchedulerTestingUtils.createFailedTaskExecutionState(attempt1.getAttemptId()));
        this.taskRestartExecutor.triggerScheduledTasks();
        Assertions.assertThat(this.testExecutionOperations.getDeployedExecutions()).hasSize(2);
    }

    @Test
    void testRestartVertexIfPartitionExceptionHappened() {
        SpeculativeScheduler scheduler = this.createSchedulerAndStartScheduling();
        ExecutionVertex ev = SpeculativeSchedulerTest.getOnlyExecutionVertex(scheduler);
        Execution attempt1 = ev.getCurrentExecutionAttempt();
        SpeculativeSchedulerTest.notifySlowTask(scheduler, attempt1);
        Execution attempt2 = SpeculativeSchedulerTest.getExecution(ev, 1);
        scheduler.updateTaskExecutionState(SchedulerTestingUtils.createFailedTaskExecutionState(attempt1.getAttemptId(), (Throwable)new PartitionNotFoundException(new ResultPartitionID())));
        Assertions.assertThat((Comparable)attempt2.getState()).isEqualTo((Object)ExecutionState.CANCELING);
        ExecutionGraphTestUtils.completeCancellingForAllVertices(scheduler.getExecutionGraph());
        this.taskRestartExecutor.triggerScheduledTasks();
        Assertions.assertThat(this.testExecutionOperations.getDeployedExecutions()).hasSize(3);
    }

    @Test
    void testCancelOtherDeployedCurrentExecutionsWhenAnyExecutionFinished() {
        SpeculativeScheduler scheduler = this.createSchedulerAndStartScheduling();
        ExecutionVertex ev = SpeculativeSchedulerTest.getOnlyExecutionVertex(scheduler);
        Execution attempt1 = ev.getCurrentExecutionAttempt();
        SpeculativeSchedulerTest.notifySlowTask(scheduler, attempt1);
        Execution attempt2 = SpeculativeSchedulerTest.getExecution(ev, 1);
        scheduler.updateTaskExecutionState(SchedulerTestingUtils.createFinishedTaskExecutionState(attempt1.getAttemptId()));
        Assertions.assertThat((Comparable)attempt2.getState()).isEqualTo((Object)ExecutionState.CANCELING);
    }

    @Test
    void testCancelOtherScheduledCurrentExecutionsWhenAnyExecutionFinished() {
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        SpeculativeScheduler scheduler = this.createSchedulerAndStartScheduling();
        ExecutionVertex ev = SpeculativeSchedulerTest.getOnlyExecutionVertex(scheduler);
        Execution attempt1 = ev.getCurrentExecutionAttempt();
        this.testExecutionSlotAllocator.completePendingRequest(attempt1.getAttemptId());
        SpeculativeSchedulerTest.notifySlowTask(scheduler, attempt1);
        Execution attempt2 = SpeculativeSchedulerTest.getExecution(ev, 1);
        scheduler.updateTaskExecutionState(SchedulerTestingUtils.createFinishedTaskExecutionState(attempt1.getAttemptId()));
        Assertions.assertThat((Comparable)attempt2.getState()).isEqualTo((Object)ExecutionState.CANCELED);
    }

    @Test
    void testExceptionHistoryIfPartitionExceptionHappened() {
        SpeculativeScheduler scheduler = this.createSchedulerAndStartScheduling();
        ExecutionVertex ev = SpeculativeSchedulerTest.getOnlyExecutionVertex(scheduler);
        Execution attempt1 = ev.getCurrentExecutionAttempt();
        SpeculativeSchedulerTest.notifySlowTask(scheduler, attempt1);
        scheduler.updateTaskExecutionState(SchedulerTestingUtils.createFailedTaskExecutionState(attempt1.getAttemptId(), (Throwable)new PartitionNotFoundException(new ResultPartitionID())));
        ExecutionGraphTestUtils.completeCancellingForAllVertices(scheduler.getExecutionGraph());
        this.taskRestartExecutor.triggerScheduledTasks();
        Assertions.assertThat((Iterable)scheduler.getExceptionHistory()).hasSize(1);
        RootExceptionHistoryEntry entry = (RootExceptionHistoryEntry)scheduler.getExceptionHistory().iterator().next();
        Assertions.assertThat((String)entry.getFailingTaskName()).isEqualTo(attempt1.getVertexWithAttempt());
    }

    @Test
    void testLocalExecutionAttemptFailureIsCorrectlyRecorded() {
        SpeculativeScheduler scheduler = this.createSchedulerAndStartScheduling();
        ExecutionVertex ev = SpeculativeSchedulerTest.getOnlyExecutionVertex(scheduler);
        Execution attempt1 = ev.getCurrentExecutionAttempt();
        SpeculativeSchedulerTest.notifySlowTask(scheduler, attempt1);
        TaskExecutionState failedState = SchedulerTestingUtils.createFailedTaskExecutionState(attempt1.getAttemptId());
        scheduler.updateTaskExecutionState(failedState);
        ClassLoader classLoader = SpeculativeSchedulerTest.class.getClassLoader();
        Assertions.assertThat((Object)scheduler.getExecutionGraph().getFailureInfo()).isNotNull();
        Assertions.assertThat((String)scheduler.getExecutionGraph().getFailureInfo().getExceptionAsString()).contains(new CharSequence[]{failedState.getError(classLoader).getMessage()});
        Assertions.assertThat((Iterable)scheduler.getExceptionHistory()).hasSize(1);
        RootExceptionHistoryEntry entry = (RootExceptionHistoryEntry)scheduler.getExceptionHistory().iterator().next();
        Assertions.assertThat((String)entry.getFailingTaskName()).isEqualTo(attempt1.getVertexWithAttempt());
    }

    @Test
    void testUnrecoverableLocalExecutionAttemptFailureWillFailJob() {
        SpeculativeScheduler scheduler = this.createSchedulerAndStartScheduling();
        ExecutionVertex ev = SpeculativeSchedulerTest.getOnlyExecutionVertex(scheduler);
        Execution attempt1 = ev.getCurrentExecutionAttempt();
        SpeculativeSchedulerTest.notifySlowTask(scheduler, attempt1);
        TaskExecutionState failedState = SchedulerTestingUtils.createFailedTaskExecutionState(attempt1.getAttemptId(), (Throwable)new SuppressRestartsException((Throwable)new Exception("Forced failure for testing.")));
        scheduler.updateTaskExecutionState(failedState);
        Assertions.assertThat((Comparable)scheduler.getExecutionGraph().getState()).isEqualTo((Object)JobStatus.FAILING);
    }

    @Test
    void testLocalExecutionAttemptFailureAndForbiddenRestartWillFailJob() {
        this.restartStrategy.setCanRestart(false);
        SpeculativeScheduler scheduler = this.createSchedulerAndStartScheduling();
        ExecutionVertex ev = SpeculativeSchedulerTest.getOnlyExecutionVertex(scheduler);
        Execution attempt1 = ev.getCurrentExecutionAttempt();
        SpeculativeSchedulerTest.notifySlowTask(scheduler, attempt1);
        TaskExecutionState failedState = SchedulerTestingUtils.createFailedTaskExecutionState(attempt1.getAttemptId());
        scheduler.updateTaskExecutionState(failedState);
        Assertions.assertThat((Comparable)scheduler.getExecutionGraph().getState()).isEqualTo((Object)JobStatus.FAILING);
    }

    static Stream<ResultPartitionType> supportedResultPartitionType() {
        return Stream.of(ResultPartitionType.BLOCKING, ResultPartitionType.HYBRID_FULL, ResultPartitionType.HYBRID_SELECTIVE);
    }

    @ParameterizedTest
    @MethodSource(value={"supportedResultPartitionType"})
    void testSpeculativeExecutionCombinedWithAdaptiveScheduling(ResultPartitionType resultPartitionType) throws Exception {
        JobVertex source = ExecutionGraphTestUtils.createNoOpVertex("source", 1);
        JobVertex sink = ExecutionGraphTestUtils.createNoOpVertex("sink", -1);
        sink.connectNewDataSetAsInput(source, DistributionPattern.ALL_TO_ALL, resultPartitionType);
        JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(source, sink);
        ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
        SpeculativeScheduler scheduler = this.createSchedulerBuilder(jobGraph, mainThreadExecutor).setVertexParallelismAndInputInfosDecider(DefaultSchedulerBuilder.createCustomParallelismDecider(3)).buildSpeculativeScheduler();
        mainThreadExecutor.execute(() -> ((SpeculativeScheduler)scheduler).startScheduling());
        DefaultExecutionGraph graph = (DefaultExecutionGraph)scheduler.getExecutionGraph();
        ExecutionJobVertex sourceExecutionJobVertex = graph.getJobVertex(source.getID());
        ExecutionJobVertex sinkExecutionJobVertex = graph.getJobVertex(sink.getID());
        ExecutionVertex sourceExecutionVertex = sourceExecutionJobVertex.getTaskVertices()[0];
        Assertions.assertThat((Collection)sourceExecutionVertex.getCurrentExecutions()).hasSize(1);
        Execution sourceAttempt1 = sourceExecutionVertex.getCurrentExecutionAttempt();
        SpeculativeSchedulerTest.notifySlowTask(scheduler, sourceAttempt1);
        Assertions.assertThat((Collection)sourceExecutionVertex.getCurrentExecutions()).hasSize(2);
        Assertions.assertThat((int)sinkExecutionJobVertex.getParallelism()).isEqualTo(-1);
        scheduler.updateTaskExecutionState(SchedulerTestingUtils.createFinishedTaskExecutionState(sourceAttempt1.getAttemptId(), AdaptiveBatchSchedulerTest.createResultPartitionBytesForExecution(sourceAttempt1)));
        Assertions.assertThat((int)sinkExecutionJobVertex.getParallelism()).isEqualTo(3);
        ExecutionVertex sinkExecutionVertex = sinkExecutionJobVertex.getTaskVertices()[0];
        Execution sinkAttempt1 = sinkExecutionVertex.getCurrentExecutionAttempt();
        SpeculativeSchedulerTest.notifySlowTask(scheduler, sinkAttempt1);
        Assertions.assertThat((Collection)sinkExecutionVertex.getCurrentExecutions()).hasSize(2);
    }

    @Test
    void testNumSlowExecutionVerticesMetric() {
        SpeculativeScheduler scheduler = this.createSchedulerAndStartScheduling();
        ExecutionVertex ev = SpeculativeSchedulerTest.getOnlyExecutionVertex(scheduler);
        Execution attempt1 = ev.getCurrentExecutionAttempt();
        SpeculativeSchedulerTest.notifySlowTask(scheduler, attempt1);
        Assertions.assertThat((long)scheduler.getNumSlowExecutionVertices()).isEqualTo(1L);
        SpeculativeSchedulerTest.notifySlowTask(scheduler, attempt1);
        Assertions.assertThat((long)scheduler.getNumSlowExecutionVertices()).isEqualTo(1L);
        scheduler.notifySlowTasks(Collections.emptyMap());
        Assertions.assertThat((long)scheduler.getNumSlowExecutionVertices()).isZero();
    }

    @Test
    void testEffectiveSpeculativeExecutionsMetric() {
        SpeculativeScheduler scheduler = this.createSchedulerAndStartScheduling();
        ExecutionVertex ev = SpeculativeSchedulerTest.getOnlyExecutionVertex(scheduler);
        Execution attempt1 = ev.getCurrentExecutionAttempt();
        SpeculativeSchedulerTest.notifySlowTask(scheduler, attempt1);
        Execution attempt2 = SpeculativeSchedulerTest.getExecution(ev, 1);
        scheduler.updateTaskExecutionState(SchedulerTestingUtils.createFinishedTaskExecutionState(attempt2.getAttemptId()));
        Assertions.assertThat((long)scheduler.getNumEffectiveSpeculativeExecutions()).isEqualTo(1L);
        scheduler.updateTaskExecutionState(SchedulerTestingUtils.createCanceledTaskExecutionState(attempt1.getAttemptId()));
        scheduler.handleGlobalFailure((Throwable)new Exception());
        this.taskRestartExecutor.triggerScheduledTasks();
        Assertions.assertThat((long)scheduler.getNumEffectiveSpeculativeExecutions()).isZero();
        Execution attempt3 = SpeculativeSchedulerTest.getExecution(ev, 2);
        SpeculativeSchedulerTest.notifySlowTask(scheduler, attempt3);
        scheduler.updateTaskExecutionState(SchedulerTestingUtils.createFinishedTaskExecutionState(attempt3.getAttemptId()));
        Assertions.assertThat((long)scheduler.getNumEffectiveSpeculativeExecutions()).isZero();
    }

    private static Execution getExecution(ExecutionVertex executionVertex, int attemptNumber) {
        return executionVertex.getCurrentExecutions().stream().filter(e -> e.getAttemptNumber() == attemptNumber).findFirst().get();
    }

    private static ExecutionVertex getOnlyExecutionVertex(SpeculativeScheduler scheduler) {
        return (ExecutionVertex)Iterables.getOnlyElement((Iterable)scheduler.getExecutionGraph().getAllExecutionVertices());
    }

    private SpeculativeScheduler createSchedulerAndStartScheduling() {
        return this.createSchedulerAndStartScheduling(DefaultSchedulerTest.singleNonParallelJobVertexJobGraph());
    }

    private SpeculativeScheduler createSchedulerAndStartScheduling(JobGraph jobGraph) {
        ComponentMainThreadExecutor mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
        try {
            SpeculativeScheduler scheduler = this.createScheduler(jobGraph, mainThreadExecutor);
            mainThreadExecutor.execute(() -> ((SpeculativeScheduler)scheduler).startScheduling());
            return scheduler;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private SpeculativeScheduler createScheduler(JobGraph jobGraph, ComponentMainThreadExecutor mainThreadExecutor) throws Exception {
        return this.createSchedulerBuilder(jobGraph, mainThreadExecutor).buildSpeculativeScheduler();
    }

    private DefaultSchedulerBuilder createSchedulerBuilder(JobGraph jobGraph, ComponentMainThreadExecutor mainThreadExecutor) {
        Configuration configuration = new Configuration();
        configuration.set(SlowTaskDetectorOptions.CHECK_INTERVAL, (Object)Duration.ofDays(1L));
        return new DefaultSchedulerBuilder(jobGraph, mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setBlocklistOperations(this.testBlocklistOperations).setExecutionOperations(this.testExecutionOperations).setFutureExecutor(this.futureExecutor).setDelayExecutor((ScheduledExecutor)this.taskRestartExecutor).setRestartBackoffTimeStrategy(this.restartStrategy).setExecutionSlotAllocatorFactory(this.testExecutionSlotAllocatorFactory).setJobMasterConfiguration(configuration);
    }

    private static void notifySlowTask(SpeculativeScheduler scheduler, Execution slowTask) {
        scheduler.notifySlowTasks((Map)ImmutableMap.of((Object)slowTask.getVertex().getID(), Collections.singleton(slowTask.getAttemptId())));
    }

    private static class TestBlocklistOperations
    implements BlocklistOperations {
        private final List<BlockedNode> blockedNodes = new ArrayList<BlockedNode>();

        private TestBlocklistOperations() {
        }

        public void addNewBlockedNodes(Collection<BlockedNode> newNodes) {
            this.blockedNodes.addAll(newNodes);
        }

        public Set<String> getAllBlockedNodeIds() {
            return this.blockedNodes.stream().map(BlockedNode::getNodeId).collect(Collectors.toSet());
        }
    }
}

