/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.exceptionhistory;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
import org.apache.flink.runtime.failure.FailureEnricherUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.scheduler.exceptionhistory.FailureHandlingResultSnapshot;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedThrowable;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class FailureHandlingResultSnapshotTest {
    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    private ExecutionGraph executionGraph;

    FailureHandlingResultSnapshotTest() {
    }

    @BeforeEach
    void setup() throws JobException, JobExecutionException {
        JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
        jobGraph.getVertices().forEach(v -> v.setParallelism(3));
        this.executionGraph = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        this.executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
    }

    @Test
    void testRootCauseVertexNotFailed() {
        ExecutionVertex rootCauseExecutionVertex = this.extractExecutionVertex(0);
        FailureHandlingResult failureHandlingResult = FailureHandlingResult.restartable((Execution)rootCauseExecutionVertex.getCurrentExecutionAttempt(), (Throwable)new RuntimeException("Expected exception: root cause"), (long)System.currentTimeMillis(), (CompletableFuture)FailureEnricherUtils.EMPTY_FAILURE_LABELS, StreamSupport.stream(this.executionGraph.getAllExecutionVertices().spliterator(), false).map(ExecutionVertex::getID).collect(Collectors.toSet()), (long)0L, (boolean)false);
        Assertions.assertThatThrownBy(() -> FailureHandlingResultSnapshot.create((FailureHandlingResult)failureHandlingResult, this::getCurrentExecutions)).isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void testMissingThrowableHandling() throws ExecutionException, InterruptedException {
        ExecutionVertex rootCauseExecutionVertex = this.extractExecutionVertex(0);
        long rootCauseTimestamp = this.triggerFailure(rootCauseExecutionVertex, null);
        FailureHandlingResult failureHandlingResult = FailureHandlingResult.restartable((Execution)rootCauseExecutionVertex.getCurrentExecutionAttempt(), null, (long)rootCauseTimestamp, CompletableFuture.completedFuture(Collections.singletonMap("key2", "value2")), StreamSupport.stream(this.executionGraph.getAllExecutionVertices().spliterator(), false).map(ExecutionVertex::getID).collect(Collectors.toSet()), (long)0L, (boolean)false);
        Assertions.assertThat((Map)((Map)failureHandlingResult.getFailureLabels().get())).isEqualTo(Collections.singletonMap("key2", "value2"));
        FailureHandlingResultSnapshot testInstance = FailureHandlingResultSnapshot.create((FailureHandlingResult)failureHandlingResult, this::getCurrentExecutions);
        Throwable actualException = new SerializedThrowable(testInstance.getRootCause()).deserializeError(ClassLoader.getSystemClassLoader());
        Assertions.assertThat((Throwable)actualException).isInstanceOf(FlinkException.class);
        Assertions.assertThat((Throwable)actualException).hasMessageContaining(ErrorInfo.handleMissingThrowable(null).getMessage());
        Assertions.assertThat((long)testInstance.getTimestamp()).isEqualTo(rootCauseTimestamp);
        Assertions.assertThat((Optional)testInstance.getRootCauseExecution()).isPresent();
        Assertions.assertThat(testInstance.getRootCauseExecution().get()).isSameAs((Object)rootCauseExecutionVertex.getCurrentExecutionAttempt());
    }

    @Test
    void testLocalFailureHandlingResultSnapshotCreation() {
        ExecutionVertex rootCauseExecutionVertex = this.extractExecutionVertex(0);
        RuntimeException rootCause = new RuntimeException("Expected exception: root cause");
        ExecutionVertex otherFailedExecutionVertex = this.extractExecutionVertex(1);
        IllegalStateException otherFailure = new IllegalStateException("Expected exception: other failure");
        long rootCauseTimestamp = this.triggerFailure(rootCauseExecutionVertex, rootCause);
        this.triggerFailure(otherFailedExecutionVertex, otherFailure);
        FailureHandlingResult failureHandlingResult = FailureHandlingResult.restartable((Execution)rootCauseExecutionVertex.getCurrentExecutionAttempt(), (Throwable)rootCause, (long)rootCauseTimestamp, (CompletableFuture)FailureEnricherUtils.EMPTY_FAILURE_LABELS, StreamSupport.stream(this.executionGraph.getAllExecutionVertices().spliterator(), false).map(ExecutionVertex::getID).collect(Collectors.toSet()), (long)0L, (boolean)false);
        FailureHandlingResultSnapshot testInstance = FailureHandlingResultSnapshot.create((FailureHandlingResult)failureHandlingResult, this::getCurrentExecutions);
        Assertions.assertThat((Throwable)testInstance.getRootCause()).isSameAs((Object)rootCause);
        Assertions.assertThat((long)testInstance.getTimestamp()).isEqualTo(rootCauseTimestamp);
        Assertions.assertThat((Optional)testInstance.getRootCauseExecution()).isPresent();
        Assertions.assertThat(testInstance.getRootCauseExecution().get()).isSameAs((Object)rootCauseExecutionVertex.getCurrentExecutionAttempt());
        Assertions.assertThat((Iterable)testInstance.getConcurrentlyFailedExecution()).containsExactly((Object[])new Execution[]{otherFailedExecutionVertex.getCurrentExecutionAttempt()});
    }

    @Test
    void testFailureHandlingWithRootCauseExecutionBeingPartOfConcurrentlyFailedExecutions() {
        Execution rootCauseExecution = this.extractExecutionVertex(0).getCurrentExecutionAttempt();
        Assertions.assertThatThrownBy(() -> new FailureHandlingResultSnapshot(rootCauseExecution, (Throwable)new RuntimeException("Expected exception"), System.currentTimeMillis(), FailureEnricherUtils.EMPTY_FAILURE_LABELS, Collections.singleton(rootCauseExecution))).isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void testGlobalFailureHandlingResultSnapshotCreation() throws ExecutionException, InterruptedException {
        FlinkException rootCause = new FlinkException("Expected exception: root cause");
        long timestamp = System.currentTimeMillis();
        ExecutionVertex failedExecutionVertex0 = this.extractExecutionVertex(0);
        RuntimeException failure0 = new RuntimeException("Expected exception: failure #0");
        ExecutionVertex failedExecutionVertex1 = this.extractExecutionVertex(1);
        IllegalStateException failure1 = new IllegalStateException("Expected exception: failure #1");
        this.triggerFailure(failedExecutionVertex0, failure0);
        this.triggerFailure(failedExecutionVertex1, failure1);
        FailureHandlingResult failureHandlingResult = FailureHandlingResult.restartable(null, (Throwable)rootCause, (long)timestamp, CompletableFuture.completedFuture(Collections.singletonMap("key2", "value2")), StreamSupport.stream(this.executionGraph.getAllExecutionVertices().spliterator(), false).map(ExecutionVertex::getID).collect(Collectors.toSet()), (long)0L, (boolean)true);
        Assertions.assertThat((Map)((Map)failureHandlingResult.getFailureLabels().get())).isEqualTo(Collections.singletonMap("key2", "value2"));
        FailureHandlingResultSnapshot testInstance = FailureHandlingResultSnapshot.create((FailureHandlingResult)failureHandlingResult, this::getCurrentExecutions);
        Assertions.assertThat((Throwable)testInstance.getRootCause()).isSameAs((Object)rootCause);
        Assertions.assertThat((long)testInstance.getTimestamp()).isEqualTo(timestamp);
        Assertions.assertThat((Optional)testInstance.getRootCauseExecution()).isNotPresent();
        Assertions.assertThat((Iterable)testInstance.getConcurrentlyFailedExecution()).containsExactlyInAnyOrder((Object[])new Execution[]{failedExecutionVertex0.getCurrentExecutionAttempt(), failedExecutionVertex1.getCurrentExecutionAttempt()});
    }

    private Collection<Execution> getCurrentExecutions(ExecutionVertexID executionVertexId) {
        if (!this.executionGraph.getAllVertices().containsKey(executionVertexId.getJobVertexId())) {
            throw new IllegalArgumentException("The ExecutionJobVertex having the ID " + executionVertexId.getJobVertexId() + " does not exist.");
        }
        ExecutionVertex[] executions = ((ExecutionJobVertex)this.executionGraph.getAllVertices().get(executionVertexId.getJobVertexId())).getTaskVertices();
        if (executions.length <= executionVertexId.getSubtaskIndex()) {
            throw new IllegalArgumentException("The ExecutionVertex having the subtask ID " + executionVertexId.getSubtaskIndex() + " for ExecutionJobVertex " + executionVertexId.getJobVertexId() + " does not exist.");
        }
        return executions[executionVertexId.getSubtaskIndex()].getCurrentExecutions();
    }

    private long triggerFailure(ExecutionVertex executionVertex, Throwable throwable) {
        this.executionGraph.updateState(new TaskExecutionStateTransition(new TaskExecutionState(executionVertex.getCurrentExecutionAttempt().getAttemptId(), ExecutionState.FAILED, throwable)));
        return ((ErrorInfo)executionVertex.getFailureInfo().orElseThrow(() -> new IllegalArgumentException("The transition into failed state didn't succeed for ExecutionVertex " + executionVertex.getID() + "."))).getTimestamp();
    }

    private ExecutionVertex extractExecutionVertex(int pos) {
        ExecutionVertex executionVertex = (ExecutionVertex)Iterables.get((Iterable)this.executionGraph.getAllExecutionVertices(), (int)pos);
        executionVertex.tryAssignResource((LogicalSlot)new TestingLogicalSlotBuilder().createTestingLogicalSlot());
        return executionVertex;
    }
}

