/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.TestingBlobWriter;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlotProvider;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class ExecutionTest {
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    @RegisterExtension
    static final TestingComponentMainThreadExecutor.Extension MAIN_EXECUTOR_RESOURCE = new TestingComponentMainThreadExecutor.Extension();
    private final TestingComponentMainThreadExecutor testMainThreadUtil = MAIN_EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();

    ExecutionTest() {
    }

    @Test
    void testTerminationFutureIsCompletedAfterSlotRelease() throws Exception {
        JobVertex jobVertex = this.createNoOpJobVertex();
        JobVertexID jobVertexId = jobVertex.getID();
        TestingPhysicalSlotProvider physicalSlotProvider = TestingPhysicalSlotProvider.createWithLimitedAmountOfPhysicalSlots(1);
        DefaultScheduler scheduler = new DefaultSchedulerBuilder(JobGraphTestUtils.streamingJobGraph(jobVertex), ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory((ExecutionSlotAllocatorFactory)SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(physicalSlotProvider)).build();
        ExecutionJobVertex executionJobVertex = scheduler.getExecutionJobVertex(jobVertexId);
        ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
        scheduler.startScheduling();
        Execution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt();
        CompletableFuture<TestingPhysicalSlot> returnedSlotFuture = physicalSlotProvider.getFirstResponseOrFail();
        CompletableFuture terminationFuture = executionVertex.cancel();
        currentExecutionAttempt.completeCancelling();
        CompletionStage restartFuture = terminationFuture.thenApply(ignored -> {
            Assertions.assertThat((CompletableFuture)returnedSlotFuture).isDone();
            return true;
        });
        ((CompletableFuture)restartFuture).get();
    }

    @Test
    void testTaskRestoreStateIsNulledAfterDeployment() throws Exception {
        JobVertex jobVertex = this.createNoOpJobVertex();
        JobVertexID jobVertexId = jobVertex.getID();
        DefaultScheduler scheduler = new DefaultSchedulerBuilder(JobGraphTestUtils.streamingJobGraph(jobVertex), this.testMainThreadUtil.getMainThreadExecutor(), (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory((ExecutionSlotAllocatorFactory)SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(TestingPhysicalSlotProvider.createWithLimitedAmountOfPhysicalSlots(1))).build();
        ExecutionJobVertex executionJobVertex = scheduler.getExecutionJobVertex(jobVertexId);
        ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
        Execution execution = executionVertex.getCurrentExecutionAttempt();
        JobManagerTaskRestore taskRestoreState = new JobManagerTaskRestore(1L, new TaskStateSnapshot());
        execution.setInitialState(taskRestoreState);
        Assertions.assertThat((Object)execution.getTaskRestore()).isNotNull();
        this.testMainThreadUtil.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> ((SchedulerBase)scheduler).startScheduling()));
        while (execution.getTaskRestore() != null) {
            Thread.sleep(10L);
        }
    }

    @Test
    void testCanceledExecutionReturnsSlot() throws Exception {
        JobVertex jobVertex = this.createNoOpJobVertex();
        JobVertexID jobVertexId = jobVertex.getID();
        SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
        TestingPhysicalSlotProvider physicalSlotProvider = TestingPhysicalSlotProvider.create(resourceProfile -> CompletableFuture.completedFuture(TestingPhysicalSlot.builder().withTaskManagerGateway(taskManagerGateway).build()));
        DefaultScheduler scheduler = new DefaultSchedulerBuilder(JobGraphTestUtils.streamingJobGraph(jobVertex), this.testMainThreadUtil.getMainThreadExecutor(), (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory((ExecutionSlotAllocatorFactory)SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(physicalSlotProvider)).build();
        ExecutionJobVertex executionJobVertex = scheduler.getExecutionJobVertex(jobVertexId);
        ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
        Execution execution = executionVertex.getCurrentExecutionAttempt();
        taskManagerGateway.setCancelConsumer(executionAttemptID -> {
            if (execution.getAttemptId().equals(executionAttemptID)) {
                execution.completeCancelling();
            }
        });
        this.testMainThreadUtil.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> ((SchedulerBase)scheduler).startScheduling()));
        this.testMainThreadUtil.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> ((Execution)execution).cancel()));
        Assertions.assertThat(physicalSlotProvider.getRequests().keySet()).isEqualTo(physicalSlotProvider.getCancellations().keySet());
    }

    @Test
    void testSlotReleaseAtomicallyReleasesExecution() throws Exception {
        JobVertex jobVertex = this.createNoOpJobVertex();
        TestingPhysicalSlotProvider physicalSlotProvider = TestingPhysicalSlotProvider.createWithLimitedAmountOfPhysicalSlots(1);
        DefaultScheduler scheduler = new DefaultSchedulerBuilder(JobGraphTestUtils.streamingJobGraph(jobVertex), this.testMainThreadUtil.getMainThreadExecutor(), (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory((ExecutionSlotAllocatorFactory)SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(physicalSlotProvider)).build();
        Execution execution = scheduler.getExecutionJobVertex(jobVertex.getID()).getTaskVertices()[0].getCurrentExecutionAttempt();
        this.testMainThreadUtil.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> ((SchedulerBase)scheduler).startScheduling()));
        physicalSlotProvider.awaitAllSlotRequests();
        TestingPhysicalSlot physicalSlot = physicalSlotProvider.getFirstResponseOrFail().get();
        this.testMainThreadUtil.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            Assertions.assertThat((Comparable)execution.getAssignedAllocationID()).isEqualTo((Object)physicalSlot.getAllocationId());
            physicalSlot.releasePayload((Throwable)new FlinkException("Test exception"));
            Assertions.assertThat((CompletableFuture)execution.getReleaseFuture()).isDone();
        }));
    }

    @Test
    void testExecutionCancelledDuringTaskRestoreOffload() {
        CountDownLatch offloadLatch = new CountDownLatch(1);
        Execution cancelledExecution = (Execution)this.testMainThreadUtil.execute(() -> {
            ConditionalLatchBlockingBlobWriter blobWriter = new ConditionalLatchBlockingBlobWriter(offloadLatch);
            JobVertex jobVertex = this.createNoOpJobVertex();
            JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex);
            DefaultExecutionGraph executionGraph = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).setBlobWriter(blobWriter).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
            executionGraph.start(this.testMainThreadUtil.getMainThreadExecutor());
            ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertex.getID());
            ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
            Execution execution = executionVertex.getCurrentExecutionAttempt();
            JobManagerTaskRestore taskRestoreState = new JobManagerTaskRestore(1L, new TaskStateSnapshot());
            execution.setInitialState(taskRestoreState);
            execution.tryAssignResource((LogicalSlot)new TestingLogicalSlotBuilder().createTestingLogicalSlot());
            execution.transitionState(ExecutionState.SCHEDULED);
            blobWriter.enableBlocking();
            execution.deploy();
            execution.cancel();
            return execution;
        });
        offloadLatch.countDown();
        ((CompletableFuture)cancelledExecution.getTddCreationDuringDeployFuture().handle((result, exception) -> {
            ((AbstractThrowableAssert)Assertions.assertThat((Throwable)exception).isNotNull()).describedAs("Expected IllegalStateException to be thrown", new Object[0]);
            Throwable rootCause = exception.getCause();
            Assertions.assertThat((Throwable)rootCause).isInstanceOf(IllegalStateException.class);
            Assertions.assertThat((String)rootCause.getMessage()).contains(new CharSequence[]{"execution state has switched"});
            Assertions.assertThat((String)rootCause.getMessage()).contains(new CharSequence[]{"during task restore offload"});
            return null;
        })).join();
    }

    @Nonnull
    private JobVertex createNoOpJobVertex() {
        JobVertex jobVertex = new JobVertex("Test vertex", new JobVertexID());
        jobVertex.setInvokableClass(NoOpInvokable.class);
        return jobVertex;
    }

    private static class ConditionalLatchBlockingBlobWriter
    extends TestingBlobWriter {
        private final CountDownLatch offloadLatch;
        private volatile boolean blockingEnabled = false;

        public ConditionalLatchBlockingBlobWriter(CountDownLatch offloadLatch) {
            this.offloadLatch = offloadLatch;
        }

        public void enableBlocking() {
            this.blockingEnabled = true;
        }

        @Override
        public PermanentBlobKey putPermanent(JobID jobId, InputStream inputStream) throws IOException {
            if (this.blockingEnabled) {
                try {
                    this.offloadLatch.await();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IOException("Interrupted while waiting for offload latch", e);
                }
            }
            return super.putPermanent(jobId, inputStream);
        }
    }
}

