package org.apache.flink.runtime.executiongraph;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.TestingBlobWriter;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.deployment.CachedShuffleDescriptors;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorTestUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.function.RunnableWithException;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.NotThrownAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/RemoveCachedShuffleDescriptorTest.class */
class RemoveCachedShuffleDescriptorTest {
    private static final int PARALLELISM = 4;

    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    private ScheduledExecutorService scheduledExecutorService;
    private ComponentMainThreadExecutor mainThreadExecutor;
    private ManuallyTriggeredScheduledExecutorService ioExecutor;

    RemoveCachedShuffleDescriptorTest() {
    }

    @BeforeEach
    void setup() {
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        this.mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(this.scheduledExecutorService);
        this.ioExecutor = new ManuallyTriggeredScheduledExecutorService();
    }

    @AfterEach
    void teardown() {
        if (this.scheduledExecutorService != null) {
            this.scheduledExecutorService.shutdownNow();
        }
    }

    @Test
    void testRemoveNonOffloadedCacheForAllToAllEdgeAfterFinished() throws Exception {
        testRemoveCacheForAllToAllEdgeAfterFinished(new TestingBlobWriter(Integer.MAX_VALUE), Integer.MAX_VALUE, 0, 0);
    }

    @Test
    void testRemoveOffloadedCacheForAllToAllEdgeAfterFinished() throws Exception {
        testRemoveCacheForAllToAllEdgeAfterFinished(new TestingBlobWriter(0), 0, 4, 3);
    }

    private void testRemoveCacheForAllToAllEdgeAfterFinished(TestingBlobWriter testingBlobWriter, int i, int i2, int i3) throws Exception {
        JobID jobID = new JobID();
        JobVertex createNoOpVertex = ExecutionGraphTestUtils.createNoOpVertex("v1", 4);
        JobVertex createNoOpVertex2 = ExecutionGraphTestUtils.createNoOpVertex("v2", 4);
        Configuration configuration = new Configuration();
        configuration.set(TaskDeploymentDescriptorFactory.OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD, Integer.valueOf(i));
        ExecutionGraph executionGraph = createSchedulerAndDeploy(jobID, createNoOpVertex, createNoOpVertex2, DistributionPattern.ALL_TO_ALL, testingBlobWriter, configuration).getExecutionGraph();
        executionInMainThread(() -> {
            Assertions.assertThat(TaskDeploymentDescriptorTestUtils.deserializeShuffleDescriptors(getConsumedCachedShuffleDescriptor(executionGraph, createNoOpVertex2).getAllSerializedShuffleDescriptorGroups(), jobID, testingBlobWriter)).hasSize(4);
            Assertions.assertThat(testingBlobWriter.numberOfBlobs()).isEqualTo(i2);
        });
        CompletableFuture.runAsync(() -> {
            ExecutionGraphTestUtils.finishJobVertex(executionGraph, createNoOpVertex2.getID());
        }, this.mainThreadExecutor).join();
        this.ioExecutor.triggerAll();
        executionInMainThread(() -> {
            Assertions.assertThat(getConsumedCachedShuffleDescriptor(executionGraph, createNoOpVertex2)).isNull();
            Assertions.assertThat(testingBlobWriter.numberOfBlobs()).isEqualTo(i3);
        });
    }

    @Test
    void testRemoveNonOffloadedCacheForAllToAllEdgeAfterFailover() throws Exception {
        testRemoveCacheForAllToAllEdgeAfterFailover(new TestingBlobWriter(Integer.MAX_VALUE), Integer.MAX_VALUE, 0, 0);
    }

    @Test
    void testRemoveOffloadedCacheForAllToAllEdgeAfterFailover() throws Exception {
        testRemoveCacheForAllToAllEdgeAfterFailover(new TestingBlobWriter(0), 0, 4, 3);
    }

    private void testRemoveCacheForAllToAllEdgeAfterFailover(TestingBlobWriter testingBlobWriter, int i, int i2, int i3) throws Exception {
        JobID jobID = new JobID();
        JobVertex createNoOpVertex = ExecutionGraphTestUtils.createNoOpVertex("v1", 4);
        JobVertex createNoOpVertex2 = ExecutionGraphTestUtils.createNoOpVertex("v2", 4);
        Configuration configuration = new Configuration();
        configuration.set(TaskDeploymentDescriptorFactory.OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD, Integer.valueOf(i));
        SchedulerBase createSchedulerAndDeploy = createSchedulerAndDeploy(jobID, createNoOpVertex, createNoOpVertex2, DistributionPattern.ALL_TO_ALL, testingBlobWriter, configuration);
        ExecutionGraph executionGraph = createSchedulerAndDeploy.getExecutionGraph();
        executionInMainThread(() -> {
            Assertions.assertThat(TaskDeploymentDescriptorTestUtils.deserializeShuffleDescriptors(getConsumedCachedShuffleDescriptor(executionGraph, createNoOpVertex2).getAllSerializedShuffleDescriptorGroups(), jobID, testingBlobWriter)).hasSize(4);
            Assertions.assertThat(testingBlobWriter.numberOfBlobs()).isEqualTo(i2);
        });
        triggerGlobalFailoverAndComplete(createSchedulerAndDeploy, createNoOpVertex, createNoOpVertex2);
        this.ioExecutor.triggerAll();
        executionInMainThread(() -> {
            Assertions.assertThat(getConsumedCachedShuffleDescriptor(executionGraph, createNoOpVertex2)).isNull();
            Assertions.assertThat(testingBlobWriter.numberOfBlobs()).isEqualTo(i3);
        });
    }

    @Test
    void testRemoveNonOffloadedCacheForPointwiseEdgeAfterFinished() throws Exception {
        testRemoveCacheForPointwiseEdgeAfterFinished(new TestingBlobWriter(Integer.MAX_VALUE), Integer.MAX_VALUE, 0, 0);
    }

    @Test
    void testRemoveOffloadedCacheForPointwiseEdgeAfterFinished() throws Exception {
        testRemoveCacheForPointwiseEdgeAfterFinished(new TestingBlobWriter(0), 0, 7, 6);
    }

    private void testRemoveCacheForPointwiseEdgeAfterFinished(TestingBlobWriter testingBlobWriter, int i, int i2, int i3) throws Exception {
        JobID jobID = new JobID();
        JobVertex createNoOpVertex = ExecutionGraphTestUtils.createNoOpVertex("v1", 4);
        JobVertex createNoOpVertex2 = ExecutionGraphTestUtils.createNoOpVertex("v2", 4);
        Configuration configuration = new Configuration();
        configuration.set(TaskDeploymentDescriptorFactory.OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD, Integer.valueOf(i));
        ExecutionGraph executionGraph = createSchedulerAndDeploy(jobID, createNoOpVertex, createNoOpVertex2, DistributionPattern.POINTWISE, testingBlobWriter, configuration).getExecutionGraph();
        executionInMainThread(() -> {
            Assertions.assertThat(TaskDeploymentDescriptorTestUtils.deserializeShuffleDescriptors(getConsumedCachedShuffleDescriptor(executionGraph, createNoOpVertex2).getAllSerializedShuffleDescriptorGroups(), jobID, testingBlobWriter)).hasSize(1);
            Assertions.assertThat(testingBlobWriter.numberOfBlobs()).isEqualTo(i2);
        });
        ExecutionVertex executionVertex = ((ExecutionJobVertex) Objects.requireNonNull(executionGraph.getJobVertex(createNoOpVertex2.getID()))).getTaskVertices()[0];
        CompletableFuture.runAsync(() -> {
            ExecutionGraphTestUtils.finishExecutionVertex(executionGraph, executionVertex);
        }, this.mainThreadExecutor).join();
        this.ioExecutor.triggerAll();
        executionInMainThread(() -> {
            Assertions.assertThat(getConsumedCachedShuffleDescriptor(executionGraph, createNoOpVertex2, 0)).isNull();
            Assertions.assertThat(TaskDeploymentDescriptorTestUtils.deserializeShuffleDescriptors(getConsumedCachedShuffleDescriptor(executionGraph, createNoOpVertex2, 1).getAllSerializedShuffleDescriptorGroups(), jobID, testingBlobWriter)).hasSize(1);
            Assertions.assertThat(testingBlobWriter.numberOfBlobs()).isEqualTo(i3);
        });
    }

    @Test
    void testRemoveNonOffloadedCacheForPointwiseEdgeAfterFailover() throws Exception {
        testRemoveCacheForPointwiseEdgeAfterFailover(new TestingBlobWriter(Integer.MAX_VALUE), Integer.MAX_VALUE, 0, 0);
    }

    @Test
    void testRemoveOffloadedCacheForPointwiseEdgeAfterFailover() throws Exception {
        testRemoveCacheForPointwiseEdgeAfterFailover(new TestingBlobWriter(0), 0, 7, 6);
    }

    private void testRemoveCacheForPointwiseEdgeAfterFailover(TestingBlobWriter testingBlobWriter, int i, int i2, int i3) throws Exception {
        JobID jobID = new JobID();
        JobVertex createNoOpVertex = ExecutionGraphTestUtils.createNoOpVertex("v1", 4);
        JobVertex createNoOpVertex2 = ExecutionGraphTestUtils.createNoOpVertex("v2", 4);
        Configuration configuration = new Configuration();
        configuration.set(TaskDeploymentDescriptorFactory.OFFLOAD_SHUFFLE_DESCRIPTORS_THRESHOLD, Integer.valueOf(i));
        ExecutionGraph executionGraph = createSchedulerAndDeploy(jobID, createNoOpVertex, createNoOpVertex2, DistributionPattern.POINTWISE, testingBlobWriter, configuration).getExecutionGraph();
        executionInMainThread(() -> {
            Assertions.assertThat(TaskDeploymentDescriptorTestUtils.deserializeShuffleDescriptors(getConsumedCachedShuffleDescriptor(executionGraph, createNoOpVertex2).getAllSerializedShuffleDescriptorGroups(), jobID, testingBlobWriter)).hasSize(1);
            Assertions.assertThat(testingBlobWriter.numberOfBlobs()).isEqualTo(i2);
        });
        triggerExceptionAndComplete(executionGraph, createNoOpVertex, createNoOpVertex2);
        this.ioExecutor.triggerAll();
        executionInMainThread(() -> {
            Assertions.assertThat(getConsumedCachedShuffleDescriptor(executionGraph, createNoOpVertex2, 0)).isNull();
            Assertions.assertThat(TaskDeploymentDescriptorTestUtils.deserializeShuffleDescriptors(getConsumedCachedShuffleDescriptor(executionGraph, createNoOpVertex2, 1).getAllSerializedShuffleDescriptorGroups(), jobID, testingBlobWriter)).hasSize(1);
            Assertions.assertThat(testingBlobWriter.numberOfBlobs()).isEqualTo(i3);
        });
    }

    private SchedulerBase createSchedulerAndDeploy(JobID jobID, JobVertex jobVertex, JobVertex jobVertex2, DistributionPattern distributionPattern, BlobWriter blobWriter, Configuration configuration) throws Exception {
        return SchedulerTestingUtils.createSchedulerAndDeploy(false, jobID, jobVertex, new JobVertex[]{jobVertex2}, distributionPattern, blobWriter, this.mainThreadExecutor, this.ioExecutor, NoOpJobMasterPartitionTracker.INSTANCE, (ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor(), configuration);
    }

    private void triggerGlobalFailoverAndComplete(SchedulerBase schedulerBase, JobVertex jobVertex, JobVertex jobVertex2) throws TimeoutException {
        Exception exc = new Exception();
        ExecutionGraph executionGraph = schedulerBase.getExecutionGraph();
        CompletableFuture.runAsync(() -> {
            schedulerBase.handleGlobalFailure(exc);
            for (ExecutionVertex executionVertex : ((ExecutionJobVertex) Objects.requireNonNull(executionGraph.getJobVertex(jobVertex2.getID()))).getTaskVertices()) {
                executionVertex.getCurrentExecutionAttempt().completeCancelling();
            }
        }, this.mainThreadExecutor).join();
        for (ExecutionVertex executionVertex : ((ExecutionJobVertex) Objects.requireNonNull(executionGraph.getJobVertex(jobVertex.getID()))).getTaskVertices()) {
            ExecutionGraphTestUtils.waitUntilExecutionVertexState(executionVertex, ExecutionState.DEPLOYING, 1000L);
        }
    }

    private void triggerExceptionAndComplete(ExecutionGraph executionGraph, JobVertex jobVertex, JobVertex jobVertex2) throws TimeoutException {
        ExecutionVertex executionVertex = ((ExecutionJobVertex) Objects.requireNonNull(executionGraph.getJobVertex(jobVertex.getID()))).getTaskVertices()[0];
        ExecutionVertex executionVertex2 = ((ExecutionJobVertex) Objects.requireNonNull(executionGraph.getJobVertex(jobVertex2.getID()))).getTaskVertices()[0];
        CompletableFuture.runAsync(() -> {
            executionVertex2.markFailed(new PartitionNotFoundException(new ResultPartitionID()));
        }, this.mainThreadExecutor).join();
        ExecutionGraphTestUtils.waitUntilExecutionVertexState(executionVertex, ExecutionState.DEPLOYING, 1000L);
    }

    private void executionInMainThread(RunnableWithException runnableWithException) {
        CompletableFuture.runAsync(() -> {
            NotThrownAssert assertThatNoException = Assertions.assertThatNoException();
            runnableWithException.getClass();
            assertThatNoException.isThrownBy(runnableWithException::run);
        }, this.mainThreadExecutor).join();
    }

    private static CachedShuffleDescriptors getConsumedCachedShuffleDescriptor(ExecutionGraph executionGraph, JobVertex jobVertex) {
        return getConsumedCachedShuffleDescriptor(executionGraph, jobVertex, 0);
    }

    private static CachedShuffleDescriptors getConsumedCachedShuffleDescriptor(ExecutionGraph executionGraph, JobVertex jobVertex, int i) {
        ExecutionJobVertex jobVertex2 = executionGraph.getJobVertex(jobVertex.getID());
        return ((IntermediateResult) ((ExecutionJobVertex) Objects.requireNonNull(jobVertex2)).getInputs().get(0)).getCachedShuffleDescriptors(jobVertex2.getTaskVertices()[i].getConsumedPartitionGroup(0));
    }
}
