package org.apache.flink.runtime.executiongraph.failover;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.core.failure.TestingFailureEnricher;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.IterableUtils;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest.class */
class ExecutionFailureHandlerTest {

    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    private static final long RESTART_DELAY_MS = 1234;
    private SchedulingTopology schedulingTopology;
    private TestFailoverStrategy failoverStrategy;
    private AtomicBoolean isNewAttempt;
    private TestRestartBackoffTimeStrategy backoffTimeStrategy;
    private ExecutionFailureHandler executionFailureHandler;
    private TestingFailureEnricher testingFailureEnricher;

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandlerTest$TestFailoverStrategy.class */
    private static class TestFailoverStrategy implements FailoverStrategy {
        private Set<ExecutionVertexID> tasksToRestart;

        public void setTasksToRestart(Set<ExecutionVertexID> set) {
            this.tasksToRestart = set;
        }

        public Set<ExecutionVertexID> getTasksNeedingRestart(ExecutionVertexID executionVertexID, Throwable th) {
            return this.tasksToRestart;
        }
    }

    ExecutionFailureHandlerTest() {
    }

    @BeforeEach
    void setUp() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        testingSchedulingTopology.newExecutionVertex();
        this.schedulingTopology = testingSchedulingTopology;
        this.failoverStrategy = new TestFailoverStrategy();
        this.testingFailureEnricher = new TestingFailureEnricher();
        this.isNewAttempt = new AtomicBoolean(true);
        AtomicBoolean atomicBoolean = this.isNewAttempt;
        atomicBoolean.getClass();
        this.backoffTimeStrategy = new TestRestartBackoffTimeStrategy(true, RESTART_DELAY_MS, atomicBoolean::get);
        this.executionFailureHandler = new ExecutionFailureHandler(this.schedulingTopology, this.failoverStrategy, this.backoffTimeStrategy, ComponentMainThreadExecutorServiceAdapter.forMainThread(), Collections.singleton(this.testingFailureEnricher), (FailureEnricher.Context) null, (FailureEnricher.Context) null);
    }

    @Test
    void testNormalFailureHandling() throws Exception {
        Set<ExecutionVertexID> singleton = Collections.singleton(new ExecutionVertexID(new JobVertexID(), 0));
        this.failoverStrategy.setTasksToRestart(singleton);
        Execution createExecution = FailureHandlingResultTest.createExecution((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        Exception exc = new Exception("test failure");
        long currentTimeMillis = System.currentTimeMillis();
        FailureHandlingResult failureHandlingResult = this.executionFailureHandler.getFailureHandlingResult(createExecution, exc, currentTimeMillis);
        Assertions.assertThat(failureHandlingResult.canRestart()).isTrue();
        Assertions.assertThat(failureHandlingResult.getFailedExecution()).isPresent();
        Assertions.assertThat(failureHandlingResult.getFailedExecution().get()).isSameAs(createExecution);
        Assertions.assertThat(failureHandlingResult.getRestartDelayMS()).isEqualTo(RESTART_DELAY_MS);
        Assertions.assertThat(failureHandlingResult.getVerticesToRestart()).isEqualTo(singleton);
        Assertions.assertThat(failureHandlingResult.getError()).isSameAs(exc);
        Assertions.assertThat(failureHandlingResult.getTimestamp()).isEqualTo(currentTimeMillis);
        Assertions.assertThat(this.testingFailureEnricher.getSeenThrowables()).containsExactly(new Throwable[]{exc});
        Assertions.assertThat((Map) failureHandlingResult.getFailureLabels().get()).isEqualTo(this.testingFailureEnricher.getFailureLabels());
        Assertions.assertThat(this.executionFailureHandler.getNumberOfRestarts()).isOne();
    }

    @Test
    void testRestartingSuppressedFailureHandlingResult() throws Exception {
        this.backoffTimeStrategy.setCanRestart(false);
        Execution createExecution = FailureHandlingResultTest.createExecution((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        Exception exc = new Exception("expected test failure");
        long currentTimeMillis = System.currentTimeMillis();
        FailureHandlingResult failureHandlingResult = this.executionFailureHandler.getFailureHandlingResult(createExecution, exc, currentTimeMillis);
        Assertions.assertThat(failureHandlingResult.canRestart()).isFalse();
        Assertions.assertThat(failureHandlingResult.getFailedExecution()).isPresent();
        Assertions.assertThat(failureHandlingResult.getFailedExecution().get()).isSameAs(createExecution);
        Assertions.assertThat(failureHandlingResult.getError()).hasCause(exc);
        Assertions.assertThat(failureHandlingResult.getTimestamp()).isEqualTo(currentTimeMillis);
        Assertions.assertThat(this.testingFailureEnricher.getSeenThrowables()).containsExactly(new Throwable[]{exc});
        Assertions.assertThat((Map) failureHandlingResult.getFailureLabels().get()).isEqualTo(this.testingFailureEnricher.getFailureLabels());
        Assertions.assertThat(ExecutionFailureHandler.isUnrecoverableError(failureHandlingResult.getError())).isFalse();
        failureHandlingResult.getClass();
        Assertions.assertThatThrownBy(failureHandlingResult::getVerticesToRestart).as("getVerticesToRestart is not allowed when restarting is suppressed", new Object[0]).isInstanceOf(IllegalStateException.class);
        failureHandlingResult.getClass();
        Assertions.assertThatThrownBy(failureHandlingResult::getRestartDelayMS).as("getRestartDelayMS is not allowed when restarting is suppressed", new Object[0]).isInstanceOf(IllegalStateException.class);
        Assertions.assertThat(this.executionFailureHandler.getNumberOfRestarts()).isZero();
    }

    @Test
    void testNonRecoverableFailureHandlingResult() throws Exception {
        Execution createExecution = FailureHandlingResultTest.createExecution((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        Exception exc = new Exception((Throwable) new SuppressRestartsException(new Exception("test failure")));
        long currentTimeMillis = System.currentTimeMillis();
        this.isNewAttempt.set(false);
        FailureHandlingResult failureHandlingResult = this.executionFailureHandler.getFailureHandlingResult(createExecution, exc, currentTimeMillis);
        Assertions.assertThat(failureHandlingResult.canRestart()).isFalse();
        Assertions.assertThat(failureHandlingResult.getFailedExecution()).isPresent();
        Assertions.assertThat(failureHandlingResult.getFailedExecution().get()).isSameAs(createExecution);
        Assertions.assertThat(failureHandlingResult.getError()).isNotNull();
        Assertions.assertThat(ExecutionFailureHandler.isUnrecoverableError(failureHandlingResult.getError())).isTrue();
        Assertions.assertThat(this.testingFailureEnricher.getSeenThrowables()).containsExactly(new Throwable[]{exc});
        Assertions.assertThat((Map) failureHandlingResult.getFailureLabels().get()).isEqualTo(this.testingFailureEnricher.getFailureLabels());
        Assertions.assertThat(failureHandlingResult.getTimestamp()).isEqualTo(currentTimeMillis);
        ((AbstractBooleanAssert) Assertions.assertThat(failureHandlingResult.isRootCause()).as("A NonRecoverableFailure should be new attempt even if RestartBackoffTimeStrategy consider it's not new attempt.", new Object[0])).isTrue();
        failureHandlingResult.getClass();
        Assertions.assertThatThrownBy(failureHandlingResult::getVerticesToRestart).as("getVerticesToRestart is not allowed when restarting is suppressed", new Object[0]).isInstanceOf(IllegalStateException.class);
        failureHandlingResult.getClass();
        Assertions.assertThatThrownBy(failureHandlingResult::getRestartDelayMS).as("getRestartDelayMS is not allowed when restarting is suppressed", new Object[0]).isInstanceOf(IllegalStateException.class);
        Assertions.assertThat(this.executionFailureHandler.getNumberOfRestarts()).isZero();
    }

    @Test
    void testNewAttemptAndNumberOfRestarts() throws Exception {
        this.failoverStrategy.setTasksToRestart(Collections.singleton(new ExecutionVertexID(new JobVertexID(), 0)));
        Execution createExecution = FailureHandlingResultTest.createExecution((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        Exception exc = new Exception("expected test failure");
        testHandlingRootException(createExecution, exc);
        this.isNewAttempt.set(false);
        testHandlingConcurrentException(createExecution, exc);
        testHandlingConcurrentException(createExecution, exc);
        this.isNewAttempt.set(true);
        testHandlingRootException(createExecution, exc);
        testHandlingRootException(createExecution, exc);
        this.isNewAttempt.set(false);
        testHandlingConcurrentException(createExecution, exc);
        testHandlingConcurrentException(createExecution, exc);
    }

    private void testHandlingRootException(Execution execution, Throwable th) {
        long numberOfRestarts = this.executionFailureHandler.getNumberOfRestarts();
        ((AbstractBooleanAssert) Assertions.assertThat(this.executionFailureHandler.getFailureHandlingResult(execution, th, System.currentTimeMillis()).isRootCause()).as("The FailureHandlingResult should be the root cause if exception is new attempt.", new Object[0])).isTrue();
        Assertions.assertThat(this.executionFailureHandler.getNumberOfRestarts()).as("The numberOfRestarts should be increased when it's a root exception.", new Object[0]).isEqualTo(numberOfRestarts + 1);
    }

    private void testHandlingConcurrentException(Execution execution, Throwable th) {
        long numberOfRestarts = this.executionFailureHandler.getNumberOfRestarts();
        ((AbstractBooleanAssert) Assertions.assertThat(this.executionFailureHandler.getFailureHandlingResult(execution, th, System.currentTimeMillis()).isRootCause()).as("The FailureHandlingResult shouldn't be the root cause if exception isn't new attempt.", new Object[0])).isFalse();
        Assertions.assertThat(this.executionFailureHandler.getNumberOfRestarts()).as("The numberOfRestarts shouldn't be increased when it isn't a root exception.", new Object[0]).isEqualTo(numberOfRestarts);
    }

    @Test
    void testUnrecoverableErrorCheck() {
        Assertions.assertThat(ExecutionFailureHandler.isUnrecoverableError(new Exception())).isFalse();
        Assertions.assertThat(ExecutionFailureHandler.isUnrecoverableError(new SuppressRestartsException(new Exception()))).isTrue();
        Assertions.assertThat(ExecutionFailureHandler.isUnrecoverableError(new Exception((Throwable) new SuppressRestartsException(new Exception())))).isTrue();
    }

    @Test
    void testGlobalFailureHandling() throws ExecutionException, InterruptedException {
        Exception exc = new Exception("Expected test failure");
        long currentTimeMillis = System.currentTimeMillis();
        FailureHandlingResult globalFailureHandlingResult = this.executionFailureHandler.getGlobalFailureHandlingResult(exc, currentTimeMillis);
        Assertions.assertThat(globalFailureHandlingResult.getVerticesToRestart()).isEqualTo(IterableUtils.toStream(this.schedulingTopology.getVertices()).map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toSet()));
        Assertions.assertThat(globalFailureHandlingResult.getError()).isSameAs(exc);
        Assertions.assertThat(globalFailureHandlingResult.getTimestamp()).isEqualTo(currentTimeMillis);
        Assertions.assertThat(this.testingFailureEnricher.getSeenThrowables()).containsExactly(new Throwable[]{exc});
        Assertions.assertThat((Map) globalFailureHandlingResult.getFailureLabels().get()).isEqualTo(this.testingFailureEnricher.getFailureLabels());
    }
}
