/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.slowtaskdetector;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SlowTaskDetectorOptions;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
import org.apache.flink.runtime.scheduler.slowtaskdetector.ExecutionTimeBasedSlowTaskDetector;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class ExecutionTimeBasedSlowTaskDetectorTest {
    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    ExecutionTimeBasedSlowTaskDetectorTest() {
    }

    @Test
    void testNoFinishedTaskButRatioIsZero() throws Exception {
        int parallelism = 3;
        JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(3);
        ExecutionGraph executionGraph = this.createExecutionGraph(jobVertex);
        ExecutionTimeBasedSlowTaskDetector slowTaskDetector = this.createSlowTaskDetector(0.0, 1.0, 0L);
        Map slowTasks = slowTaskDetector.findSlowTasks(executionGraph);
        Assertions.assertThat((Map)slowTasks).hasSize(3);
    }

    @Test
    void testAllTasksInCreatedAndNoSlowTasks() throws Exception {
        int parallelism = 3;
        JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(3);
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex);
        ExecutionGraph executionGraph = SchedulerTestingUtils.createScheduler(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).getExecutionGraph();
        ExecutionTimeBasedSlowTaskDetector slowTaskDetector = this.createSlowTaskDetector(0.0, 1.0, 0L);
        Map slowTasks = slowTaskDetector.findSlowTasks(executionGraph);
        Assertions.assertThat((int)slowTasks.size()).isZero();
    }

    @Test
    void testFinishedTaskNotExceedRatio() throws Exception {
        int parallelism = 3;
        JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(3);
        ExecutionGraph executionGraph = this.createExecutionGraph(jobVertex);
        ExecutionTimeBasedSlowTaskDetector slowTaskDetector = this.createSlowTaskDetector(0.5, 1.0, 0L);
        ExecutionVertex ev1 = executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[0];
        ev1.getCurrentExecutionAttempt().markFinished();
        Map slowTasks = slowTaskDetector.findSlowTasks(executionGraph);
        Assertions.assertThat((Map)slowTasks).isEmpty();
    }

    @Test
    void testFinishedTaskExceedRatio() throws Exception {
        int parallelism = 3;
        JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(3);
        ExecutionGraph executionGraph = this.createExecutionGraph(jobVertex);
        ExecutionTimeBasedSlowTaskDetector slowTaskDetector = this.createSlowTaskDetector(0.3, 1.0, 0L);
        ExecutionVertex ev3 = executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[2];
        ev3.getCurrentExecutionAttempt().markFinished();
        Map slowTasks = slowTaskDetector.findSlowTasks(executionGraph);
        Assertions.assertThat((Map)slowTasks).hasSize(2);
    }

    @Test
    void testLargeLowerBound() throws Exception {
        int parallelism = 3;
        JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(3);
        ExecutionGraph executionGraph = this.createExecutionGraph(jobVertex);
        ExecutionTimeBasedSlowTaskDetector slowTaskDetector = this.createSlowTaskDetector(0.3, 1.0, Integer.MAX_VALUE);
        ExecutionVertex ev3 = executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[2];
        ev3.getCurrentExecutionAttempt().markFinished();
        Map slowTasks = slowTaskDetector.findSlowTasks(executionGraph);
        Assertions.assertThat((Map)slowTasks).isEmpty();
    }

    @Test
    void testLargeMultiplier() throws Exception {
        int parallelism = 3;
        JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(3);
        ExecutionGraph executionGraph = this.createExecutionGraph(jobVertex);
        ExecutionTimeBasedSlowTaskDetector slowTaskDetector = this.createSlowTaskDetector(0.3, 1000000.0, 0L);
        Thread.sleep(10L);
        ExecutionVertex ev3 = executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[2];
        ev3.getCurrentExecutionAttempt().markFinished();
        Map slowTasks = slowTaskDetector.findSlowTasks(executionGraph);
        Assertions.assertThat((Map)slowTasks).isEmpty();
    }

    @Test
    void testMultipleJobVertexFinishedTaskExceedRatio() throws Exception {
        int parallelism = 3;
        JobVertex jobVertex1 = ExecutionGraphTestUtils.createNoOpVertex(3);
        JobVertex jobVertex2 = ExecutionGraphTestUtils.createNoOpVertex(3);
        jobVertex2.connectNewDataSetAsInput(jobVertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        ExecutionGraph executionGraph = this.createExecutionGraph(jobVertex1, jobVertex2);
        ExecutionTimeBasedSlowTaskDetector slowTaskDetector = this.createSlowTaskDetector(0.3, 1.0, 0L);
        ExecutionVertex ev13 = executionGraph.getJobVertex(jobVertex1.getID()).getTaskVertices()[2];
        ev13.getCurrentExecutionAttempt().markFinished();
        ExecutionVertex ev23 = executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[2];
        ev23.getCurrentExecutionAttempt().markFinished();
        Map slowTasks = slowTaskDetector.findSlowTasks(executionGraph);
        Assertions.assertThat((Map)slowTasks).hasSize(4);
    }

    @Test
    void testFinishedTaskExceedRatioInDynamicGraph() throws Exception {
        int parallelism = 3;
        JobVertex jobVertex1 = ExecutionGraphTestUtils.createNoOpVertex(3);
        JobVertex jobVertex2 = new JobVertex("vertex2");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        ExecutionGraph executionGraph = this.createDynamicExecutionGraph(jobVertex1, jobVertex2);
        ExecutionTimeBasedSlowTaskDetector slowTaskDetector = this.createSlowTaskDetector(0.3, 1.0, 0L);
        ExecutionVertex ev13 = executionGraph.getJobVertex(jobVertex1.getID()).getTaskVertices()[2];
        ev13.getCurrentExecutionAttempt().markFinished();
        Map slowTasks = slowTaskDetector.findSlowTasks(executionGraph);
        Assertions.assertThat((Map)slowTasks).hasSize(2);
    }

    @Test
    void testBalancedInput() throws Exception {
        int parallelism = 3;
        JobVertex jobVertex1 = ExecutionGraphTestUtils.createNoOpVertex(3);
        JobVertex jobVertex2 = ExecutionGraphTestUtils.createNoOpVertex(3);
        jobVertex2.connectNewDataSetAsInput(jobVertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        ExecutionGraph executionGraph = this.createExecutionGraph(jobVertex1, jobVertex2);
        ExecutionTimeBasedSlowTaskDetector slowTaskDetector = this.createSlowTaskDetector(0.3, 1.0, 0L);
        ExecutionVertex ev21 = executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[0];
        ev21.setInputBytes(1024L);
        ExecutionVertex ev22 = executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[1];
        ev22.setInputBytes(1024L);
        ExecutionVertex ev23 = executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[2];
        ev23.setInputBytes(1024L);
        ev23.getCurrentExecutionAttempt().markFinished();
        Map slowTasks = slowTaskDetector.findSlowTasks(executionGraph);
        Assertions.assertThat((Map)slowTasks).hasSize(2);
    }

    @Test
    void testBalancedInputWithLargeLowerBound() throws Exception {
        int parallelism = 3;
        JobVertex jobVertex1 = ExecutionGraphTestUtils.createNoOpVertex(3);
        JobVertex jobVertex2 = ExecutionGraphTestUtils.createNoOpVertex(3);
        jobVertex2.connectNewDataSetAsInput(jobVertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        ExecutionGraph executionGraph = this.createExecutionGraph(jobVertex1, jobVertex2);
        ExecutionTimeBasedSlowTaskDetector slowTaskDetector = this.createSlowTaskDetector(0.3, 1.0, Integer.MAX_VALUE);
        ExecutionVertex ev21 = executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[0];
        ev21.setInputBytes(1024L);
        ExecutionVertex ev22 = executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[1];
        ev22.setInputBytes(1024L);
        ExecutionVertex ev23 = executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[2];
        ev23.setInputBytes(1024L);
        ev23.getCurrentExecutionAttempt().markFinished();
        Map slowTasks = slowTaskDetector.findSlowTasks(executionGraph);
        Assertions.assertThat((Map)slowTasks).isEmpty();
    }

    @Test
    void testUnbalancedInput() throws Exception {
        int parallelism = 3;
        JobVertex jobVertex1 = ExecutionGraphTestUtils.createNoOpVertex(3);
        JobVertex jobVertex2 = ExecutionGraphTestUtils.createNoOpVertex(3);
        jobVertex2.connectNewDataSetAsInput(jobVertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        ExecutionGraph executionGraph = this.createExecutionGraph(jobVertex1, jobVertex2);
        ExecutionTimeBasedSlowTaskDetector slowTaskDetector = this.createSlowTaskDetector(0.3, 1.0, 0L);
        ExecutionVertex ev21 = executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[0];
        ev21.setInputBytes(1024L);
        ExecutionVertex ev22 = executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[1];
        ev22.setInputBytes(1024000L);
        ExecutionVertex ev23 = executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[2];
        ev23.setInputBytes(4096000L);
        Thread.sleep(1000L);
        ev21.getCurrentExecutionAttempt().markFinished();
        Map slowTasks = slowTaskDetector.findSlowTasks(executionGraph);
        Assertions.assertThat((Map)slowTasks).hasSize(0);
    }

    @Test
    void testSortedExecutionTimeWithInputBytes() {
        ExecutionTimeBasedSlowTaskDetector.ExecutionTimeWithInputBytes executionTimeWithInputBytes1 = new ExecutionTimeBasedSlowTaskDetector.ExecutionTimeWithInputBytes(10L, 10L);
        ExecutionTimeBasedSlowTaskDetector.ExecutionTimeWithInputBytes executionTimeWithInputBytes2 = new ExecutionTimeBasedSlowTaskDetector.ExecutionTimeWithInputBytes(10L, 20L);
        ArrayList<ExecutionTimeBasedSlowTaskDetector.ExecutionTimeWithInputBytes> pairList = new ArrayList<ExecutionTimeBasedSlowTaskDetector.ExecutionTimeWithInputBytes>();
        pairList.add(executionTimeWithInputBytes1);
        pairList.add(executionTimeWithInputBytes2);
        List sortedList = pairList.stream().sorted().collect(Collectors.toList());
        Assertions.assertThat((Comparable)((Comparable)sortedList.get(0))).isEqualTo((Object)executionTimeWithInputBytes2);
        ExecutionTimeBasedSlowTaskDetector.ExecutionTimeWithInputBytes executionTimeWithInputBytes3 = new ExecutionTimeBasedSlowTaskDetector.ExecutionTimeWithInputBytes(20L, 10L);
        ExecutionTimeBasedSlowTaskDetector.ExecutionTimeWithInputBytes executionTimeWithInputBytes4 = new ExecutionTimeBasedSlowTaskDetector.ExecutionTimeWithInputBytes(10L, 10L);
        pairList.clear();
        pairList.add(executionTimeWithInputBytes3);
        pairList.add(executionTimeWithInputBytes4);
        sortedList = pairList.stream().sorted().collect(Collectors.toList());
        Assertions.assertThat((Comparable)((Comparable)sortedList.get(0))).isEqualTo((Object)executionTimeWithInputBytes4);
        ExecutionTimeBasedSlowTaskDetector.ExecutionTimeWithInputBytes executionTimeWithInputBytes5 = new ExecutionTimeBasedSlowTaskDetector.ExecutionTimeWithInputBytes(20L, -1L);
        ExecutionTimeBasedSlowTaskDetector.ExecutionTimeWithInputBytes executionTimeWithInputBytes6 = new ExecutionTimeBasedSlowTaskDetector.ExecutionTimeWithInputBytes(10L, -1L);
        pairList.clear();
        pairList.add(executionTimeWithInputBytes5);
        pairList.add(executionTimeWithInputBytes6);
        sortedList = pairList.stream().sorted().collect(Collectors.toList());
        Assertions.assertThat((Comparable)((Comparable)sortedList.get(0))).isEqualTo((Object)executionTimeWithInputBytes6);
        ExecutionTimeBasedSlowTaskDetector.ExecutionTimeWithInputBytes executionTimeWithInputBytes7 = new ExecutionTimeBasedSlowTaskDetector.ExecutionTimeWithInputBytes(1L, 0L);
        Assertions.assertThat((int)executionTimeWithInputBytes7.compareTo(executionTimeWithInputBytes1)).isEqualTo(1);
        ExecutionTimeBasedSlowTaskDetector.ExecutionTimeWithInputBytes executionTimeWithInputBytes8 = new ExecutionTimeBasedSlowTaskDetector.ExecutionTimeWithInputBytes(0L, 0L);
        Assertions.assertThat((int)executionTimeWithInputBytes8.compareTo(executionTimeWithInputBytes1)).isEqualTo(-1);
        ExecutionTimeBasedSlowTaskDetector.ExecutionTimeWithInputBytes executionTimeWithInputBytes9 = new ExecutionTimeBasedSlowTaskDetector.ExecutionTimeWithInputBytes(20L, 100L);
        ExecutionTimeBasedSlowTaskDetector.ExecutionTimeWithInputBytes executionTimeWithInputBytes10 = new ExecutionTimeBasedSlowTaskDetector.ExecutionTimeWithInputBytes(15L, -1L);
        Assertions.assertThatThrownBy(() -> executionTimeWithInputBytes9.compareTo(executionTimeWithInputBytes10)).isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void testHandleNotifySlowTasksException() throws Exception {
        int parallelism = 3;
        JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(3);
        ExecutionGraph executionGraph = this.createExecutionGraph(jobVertex);
        TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler();
        ExecutionTimeBasedSlowTaskDetector slowTaskDetector = this.createSlowTaskDetector(0.0, 1.0, 0L, fatalErrorHandler);
        RuntimeException exception = new RuntimeException("test");
        slowTaskDetector.start(executionGraph, slowTasks -> {
            throw exception;
        }, (ComponentMainThreadExecutor)new ComponentMainThreadExecutorServiceAdapter((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor(), Thread.currentThread()));
        slowTaskDetector.getScheduledDetectionFuture().get();
        Assertions.assertThat((Throwable)fatalErrorHandler.getException()).isEqualTo((Object)exception);
    }

    private ExecutionGraph createExecutionGraph(JobVertex ... jobVertices) throws Exception {
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertices);
        DefaultScheduler scheduler = SchedulerTestingUtils.createScheduler(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionGraph executionGraph = scheduler.getExecutionGraph();
        scheduler.startScheduling();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(executionGraph);
        return executionGraph;
    }

    private ExecutionGraph createDynamicExecutionGraph(JobVertex ... jobVertices) throws Exception {
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertices);
        AdaptiveBatchScheduler scheduler = new DefaultSchedulerBuilder(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).buildAdaptiveBatchJobScheduler();
        ExecutionGraph executionGraph = scheduler.getExecutionGraph();
        scheduler.startScheduling();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(executionGraph);
        return executionGraph;
    }

    private ExecutionTimeBasedSlowTaskDetector createSlowTaskDetector(double ratio, double multiplier, long lowerBoundMillis) {
        Configuration configuration = ExecutionTimeBasedSlowTaskDetectorTest.createSlowTaskDetectorConfiguration(ratio, multiplier, lowerBoundMillis);
        return new ExecutionTimeBasedSlowTaskDetector(configuration);
    }

    private ExecutionTimeBasedSlowTaskDetector createSlowTaskDetector(double ratio, double multiplier, long lowerBoundMillis, FatalErrorHandler fatalErrorHandler) {
        Configuration configuration = ExecutionTimeBasedSlowTaskDetectorTest.createSlowTaskDetectorConfiguration(ratio, multiplier, lowerBoundMillis);
        return new ExecutionTimeBasedSlowTaskDetector(configuration, fatalErrorHandler);
    }

    @Nonnull
    private static Configuration createSlowTaskDetectorConfiguration(double ratio, double multiplier, long lowerBoundMillis) {
        Configuration configuration = new Configuration();
        configuration.set(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_LOWER_BOUND, (Object)Duration.ofMillis(lowerBoundMillis));
        configuration.set(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_RATIO, (Object)ratio);
        configuration.set(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_MULTIPLIER, (Object)multiplier);
        return configuration;
    }
}

