/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.concurrent.FutureUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class ExecutionVertexCancelTest {
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    ExecutionVertexCancelTest() {
    }

    @Test
    void testCancelFromCreated() {
        try {
            ExecutionVertex vertex = ExecutionGraphTestUtils.getExecutionVertex();
            Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.CREATED);
            vertex.cancel();
            Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.CANCELED);
            Assertions.assertThat((Optional)vertex.getFailureInfo()).isNotPresent();
            Assertions.assertThat((long)vertex.getStateTimestamp(ExecutionState.CREATED)).isGreaterThan(0L);
            Assertions.assertThat((long)vertex.getStateTimestamp(ExecutionState.CANCELING)).isGreaterThan(0L);
            Assertions.assertThat((long)vertex.getStateTimestamp(ExecutionState.CANCELED)).isGreaterThan(0L);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)e.getMessage());
        }
    }

    @Test
    void testCancelFromScheduled() {
        try {
            ExecutionVertex vertex = ExecutionGraphTestUtils.getExecutionVertex();
            ExecutionGraphTestUtils.setVertexState(vertex, ExecutionState.SCHEDULED);
            Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.SCHEDULED);
            vertex.cancel();
            Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.CANCELED);
            Assertions.assertThat((Optional)vertex.getFailureInfo()).isNotPresent();
            Assertions.assertThat((long)vertex.getStateTimestamp(ExecutionState.CREATED)).isGreaterThan(0L);
            Assertions.assertThat((long)vertex.getStateTimestamp(ExecutionState.CANCELING)).isGreaterThan(0L);
            Assertions.assertThat((long)vertex.getStateTimestamp(ExecutionState.CANCELED)).isGreaterThan(0L);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)e.getMessage());
        }
    }

    @Test
    void testCancelFromRunning() {
        try {
            ExecutionVertex vertex = ExecutionGraphTestUtils.getExecutionVertex();
            TestingLogicalSlot slot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new CancelSequenceSimpleAckingTaskManagerGateway(1)).createTestingLogicalSlot();
            ExecutionGraphTestUtils.setVertexResource(vertex, slot);
            ExecutionGraphTestUtils.setVertexState(vertex, ExecutionState.RUNNING);
            Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.RUNNING);
            vertex.cancel();
            vertex.getCurrentExecutionAttempt().completeCancelling();
            Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.CANCELED);
            Assertions.assertThat((boolean)slot.isAlive()).isFalse();
            Assertions.assertThat((Optional)vertex.getFailureInfo()).isNotPresent();
            Assertions.assertThat((long)vertex.getStateTimestamp(ExecutionState.CREATED)).isGreaterThan(0L);
            Assertions.assertThat((long)vertex.getStateTimestamp(ExecutionState.CANCELING)).isGreaterThan(0L);
            Assertions.assertThat((long)vertex.getStateTimestamp(ExecutionState.CANCELED)).isGreaterThan(0L);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)e.getMessage());
        }
    }

    @Test
    void testRepeatedCancelFromRunning() {
        try {
            ExecutionVertex vertex = ExecutionGraphTestUtils.getExecutionVertex();
            TestingLogicalSlot slot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new CancelSequenceSimpleAckingTaskManagerGateway(1)).createTestingLogicalSlot();
            ExecutionGraphTestUtils.setVertexResource(vertex, slot);
            ExecutionGraphTestUtils.setVertexState(vertex, ExecutionState.RUNNING);
            Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.RUNNING);
            vertex.cancel();
            Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.CANCELING);
            vertex.cancel();
            Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.CANCELING);
            vertex.getCurrentExecutionAttempt().completeCancelling();
            Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.CANCELED);
            Assertions.assertThat((boolean)slot.isAlive()).isFalse();
            Assertions.assertThat((Optional)vertex.getFailureInfo()).isNotPresent();
            Assertions.assertThat((long)vertex.getStateTimestamp(ExecutionState.CREATED)).isGreaterThan(0L);
            Assertions.assertThat((long)vertex.getStateTimestamp(ExecutionState.CANCELING)).isGreaterThan(0L);
            Assertions.assertThat((long)vertex.getStateTimestamp(ExecutionState.CANCELED)).isGreaterThan(0L);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)e.getMessage());
        }
    }

    @Test
    void testCancelFromRunningDidNotFindTask() {
        try {
            ExecutionVertex vertex = ExecutionGraphTestUtils.getExecutionVertex();
            TestingLogicalSlot slot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new CancelSequenceSimpleAckingTaskManagerGateway(1)).createTestingLogicalSlot();
            ExecutionGraphTestUtils.setVertexResource(vertex, slot);
            ExecutionGraphTestUtils.setVertexState(vertex, ExecutionState.RUNNING);
            Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.RUNNING);
            vertex.cancel();
            Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.CANCELING);
            Assertions.assertThat((Optional)vertex.getFailureInfo()).isNotPresent();
            Assertions.assertThat((long)vertex.getStateTimestamp(ExecutionState.CREATED)).isGreaterThan(0L);
            Assertions.assertThat((long)vertex.getStateTimestamp(ExecutionState.CANCELING)).isGreaterThan(0L);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)e.getMessage());
        }
    }

    @Test
    void testCancelCallFails() {
        try {
            ExecutionVertex vertex = ExecutionGraphTestUtils.getExecutionVertex();
            TestingLogicalSlot slot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new CancelSequenceSimpleAckingTaskManagerGateway(0)).createTestingLogicalSlot();
            ExecutionGraphTestUtils.setVertexResource(vertex, slot);
            ExecutionGraphTestUtils.setVertexState(vertex, ExecutionState.RUNNING);
            Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.RUNNING);
            vertex.cancel();
            Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.CANCELED);
            Assertions.assertThat((boolean)slot.isAlive()).isFalse();
            Assertions.assertThat((long)vertex.getStateTimestamp(ExecutionState.CREATED)).isGreaterThan(0L);
            Assertions.assertThat((long)vertex.getStateTimestamp(ExecutionState.CANCELING)).isGreaterThan(0L);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)e.getMessage());
        }
    }

    @Test
    void testSendCancelAndReceiveFail() throws Exception {
        DefaultScheduler scheduler = SchedulerTestingUtils.createScheduler(JobGraphTestUtils.streamingJobGraph(ExecutionGraphTestUtils.createNoOpVertex(10)), ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionGraph graph = scheduler.getExecutionGraph();
        scheduler.startScheduling();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(graph);
        Assertions.assertThat((Comparable)graph.getState()).isEqualTo((Object)JobStatus.RUNNING);
        ExecutionVertex[] vertices = ((ExecutionJobVertex)graph.getVerticesTopologically().iterator().next()).getTaskVertices();
        Assertions.assertThat((Map)graph.getRegisteredExecutions()).hasSize(vertices.length);
        Execution exec = vertices[3].getCurrentExecutionAttempt();
        exec.cancel();
        Assertions.assertThat((Comparable)exec.getState()).isEqualTo((Object)ExecutionState.CANCELING);
        exec.markFailed((Throwable)new Exception("test"));
        Assertions.assertThat((exec.getState() == ExecutionState.FAILED || exec.getState() == ExecutionState.CANCELED ? 1 : 0) != 0).isTrue();
        Assertions.assertThat((boolean)exec.getAssignedResource().isAlive()).isFalse();
        Assertions.assertThat((Map)graph.getRegisteredExecutions()).hasSize(vertices.length - 1);
    }

    private static class CancelSequenceSimpleAckingTaskManagerGateway
    extends SimpleAckingTaskManagerGateway {
        private final int successfulOperations;
        private int index = -1;

        public CancelSequenceSimpleAckingTaskManagerGateway(int successfulOperations) {
            this.successfulOperations = successfulOperations;
        }

        @Override
        public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Duration timeout) {
            ++this.index;
            if (this.index >= this.successfulOperations) {
                return FutureUtils.completedExceptionally((Throwable)new IOException("Rpc call fails"));
            }
            return CompletableFuture.completedFuture(Acknowledge.get());
        }
    }
}

