/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.TestingBlobWriter;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactoryTest;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class RemoveCachedShuffleDescriptorTest
extends TestLogger {
    private static final int PARALLELISM = 4;
    private ScheduledExecutorService scheduledExecutorService;
    private ComponentMainThreadExecutor mainThreadExecutor;
    private ManuallyTriggeredScheduledExecutorService ioExecutor;

    @Before
    public void setup() {
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        this.mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(this.scheduledExecutorService);
        this.ioExecutor = new ManuallyTriggeredScheduledExecutorService();
    }

    @After
    public void teardown() {
        if (this.scheduledExecutorService != null) {
            this.scheduledExecutorService.shutdownNow();
        }
    }

    @Test
    public void testRemoveNonOffloadedCacheForAllToAllEdgeAfterFinished() throws Exception {
        this.testRemoveCacheForAllToAllEdgeAfterFinished(new TestingBlobWriter(Integer.MAX_VALUE), 0, 0);
    }

    @Test
    public void testRemoveOffloadedCacheForAllToAllEdgeAfterFinished() throws Exception {
        this.testRemoveCacheForAllToAllEdgeAfterFinished(new TestingBlobWriter(0), 4, 3);
    }

    private void testRemoveCacheForAllToAllEdgeAfterFinished(TestingBlobWriter blobWriter, int expectedBefore, int expectedAfter) throws Exception {
        JobID jobId = new JobID();
        JobVertex v1 = ExecutionGraphTestUtils.createNoOpVertex("v1", 4);
        JobVertex v2 = ExecutionGraphTestUtils.createNoOpVertex("v2", 4);
        DefaultScheduler scheduler = this.createSchedulerAndDeploy(jobId, v1, v2, DistributionPattern.ALL_TO_ALL, blobWriter);
        ExecutionGraph executionGraph = scheduler.getExecutionGraph();
        ShuffleDescriptor[] shuffleDescriptors = TaskDeploymentDescriptorFactoryTest.deserializeShuffleDescriptors(RemoveCachedShuffleDescriptorTest.getConsumedCachedShuffleDescriptor(executionGraph, v2), jobId, blobWriter);
        Assert.assertEquals((long)4L, (long)shuffleDescriptors.length);
        Assert.assertEquals((long)expectedBefore, (long)blobWriter.numberOfBlobs());
        CompletableFuture.runAsync(() -> RemoveCachedShuffleDescriptorTest.transitionTasksToFinished(executionGraph, v2.getID()), (Executor)this.mainThreadExecutor).join();
        this.ioExecutor.triggerAll();
        Assert.assertNull(RemoveCachedShuffleDescriptorTest.getConsumedCachedShuffleDescriptor(executionGraph, v2));
        Assert.assertEquals((long)expectedAfter, (long)blobWriter.numberOfBlobs());
    }

    @Test
    public void testRemoveNonOffloadedCacheForAllToAllEdgeAfterFailover() throws Exception {
        this.testRemoveCacheForAllToAllEdgeAfterFailover(new TestingBlobWriter(Integer.MAX_VALUE), 0, 0);
    }

    @Test
    public void testRemoveOffloadedCacheForAllToAllEdgeAfterFailover() throws Exception {
        this.testRemoveCacheForAllToAllEdgeAfterFailover(new TestingBlobWriter(0), 4, 3);
    }

    private void testRemoveCacheForAllToAllEdgeAfterFailover(TestingBlobWriter blobWriter, int expectedBefore, int expectedAfter) throws Exception {
        JobID jobId = new JobID();
        JobVertex v1 = ExecutionGraphTestUtils.createNoOpVertex("v1", 4);
        JobVertex v2 = ExecutionGraphTestUtils.createNoOpVertex("v2", 4);
        DefaultScheduler scheduler = this.createSchedulerAndDeploy(jobId, v1, v2, DistributionPattern.ALL_TO_ALL, blobWriter);
        ExecutionGraph executionGraph = scheduler.getExecutionGraph();
        ShuffleDescriptor[] shuffleDescriptors = TaskDeploymentDescriptorFactoryTest.deserializeShuffleDescriptors(RemoveCachedShuffleDescriptorTest.getConsumedCachedShuffleDescriptor(executionGraph, v2), jobId, blobWriter);
        Assert.assertEquals((long)4L, (long)shuffleDescriptors.length);
        Assert.assertEquals((long)expectedBefore, (long)blobWriter.numberOfBlobs());
        this.triggerGlobalFailoverAndComplete(scheduler, v1);
        this.ioExecutor.triggerAll();
        Assert.assertNull(RemoveCachedShuffleDescriptorTest.getConsumedCachedShuffleDescriptor(executionGraph, v2));
        Assert.assertEquals((long)expectedAfter, (long)blobWriter.numberOfBlobs());
    }

    @Test
    public void testRemoveNonOffloadedCacheForPointwiseEdgeAfterFinished() throws Exception {
        this.testRemoveCacheForPointwiseEdgeAfterFinished(new TestingBlobWriter(Integer.MAX_VALUE), 0, 0);
    }

    @Test
    public void testRemoveOffloadedCacheForPointwiseEdgeAfterFinished() throws Exception {
        this.testRemoveCacheForPointwiseEdgeAfterFinished(new TestingBlobWriter(0), 7, 6);
    }

    private void testRemoveCacheForPointwiseEdgeAfterFinished(TestingBlobWriter blobWriter, int expectedBefore, int expectedAfter) throws Exception {
        JobID jobId = new JobID();
        JobVertex v1 = ExecutionGraphTestUtils.createNoOpVertex("v1", 4);
        JobVertex v2 = ExecutionGraphTestUtils.createNoOpVertex("v2", 4);
        DefaultScheduler scheduler = this.createSchedulerAndDeploy(jobId, v1, v2, DistributionPattern.POINTWISE, blobWriter);
        ExecutionGraph executionGraph = scheduler.getExecutionGraph();
        ShuffleDescriptor[] shuffleDescriptors = TaskDeploymentDescriptorFactoryTest.deserializeShuffleDescriptors(RemoveCachedShuffleDescriptorTest.getConsumedCachedShuffleDescriptor(executionGraph, v2), jobId, blobWriter);
        Assert.assertEquals((long)1L, (long)shuffleDescriptors.length);
        Assert.assertEquals((long)expectedBefore, (long)blobWriter.numberOfBlobs());
        ExecutionVertex ev21 = Objects.requireNonNull(executionGraph.getJobVertex(v2.getID())).getTaskVertices()[0];
        CompletableFuture.runAsync(() -> RemoveCachedShuffleDescriptorTest.transitionTaskToFinished(executionGraph, ev21), (Executor)this.mainThreadExecutor).join();
        this.ioExecutor.triggerAll();
        Assert.assertNull(RemoveCachedShuffleDescriptorTest.getConsumedCachedShuffleDescriptor(executionGraph, v2, 0));
        ShuffleDescriptor[] shuffleDescriptorsForOtherVertex = TaskDeploymentDescriptorFactoryTest.deserializeShuffleDescriptors(RemoveCachedShuffleDescriptorTest.getConsumedCachedShuffleDescriptor(executionGraph, v2, 1), jobId, blobWriter);
        Assert.assertEquals((long)1L, (long)shuffleDescriptorsForOtherVertex.length);
        Assert.assertEquals((long)expectedAfter, (long)blobWriter.numberOfBlobs());
    }

    @Test
    public void testRemoveNonOffloadedCacheForPointwiseEdgeAfterFailover() throws Exception {
        this.testRemoveCacheForPointwiseEdgeAfterFailover(new TestingBlobWriter(Integer.MAX_VALUE), 0, 0);
    }

    @Test
    public void testRemoveOffloadedCacheForPointwiseEdgeAfterFailover() throws Exception {
        this.testRemoveCacheForPointwiseEdgeAfterFailover(new TestingBlobWriter(0), 7, 6);
    }

    private void testRemoveCacheForPointwiseEdgeAfterFailover(TestingBlobWriter blobWriter, int expectedBefore, int expectedAfter) throws Exception {
        JobID jobId = new JobID();
        JobVertex v1 = ExecutionGraphTestUtils.createNoOpVertex("v1", 4);
        JobVertex v2 = ExecutionGraphTestUtils.createNoOpVertex("v2", 4);
        DefaultScheduler scheduler = this.createSchedulerAndDeploy(jobId, v1, v2, DistributionPattern.POINTWISE, blobWriter);
        ExecutionGraph executionGraph = scheduler.getExecutionGraph();
        ShuffleDescriptor[] shuffleDescriptors = TaskDeploymentDescriptorFactoryTest.deserializeShuffleDescriptors(RemoveCachedShuffleDescriptorTest.getConsumedCachedShuffleDescriptor(executionGraph, v2), jobId, blobWriter);
        Assert.assertEquals((long)1L, (long)shuffleDescriptors.length);
        Assert.assertEquals((long)expectedBefore, (long)blobWriter.numberOfBlobs());
        this.triggerExceptionAndComplete(executionGraph, v1, v2);
        this.ioExecutor.triggerAll();
        Assert.assertNull(RemoveCachedShuffleDescriptorTest.getConsumedCachedShuffleDescriptor(executionGraph, v2, 0));
        ShuffleDescriptor[] shuffleDescriptorsForOtherVertex = TaskDeploymentDescriptorFactoryTest.deserializeShuffleDescriptors(RemoveCachedShuffleDescriptorTest.getConsumedCachedShuffleDescriptor(executionGraph, v2, 1), jobId, blobWriter);
        Assert.assertEquals((long)1L, (long)shuffleDescriptorsForOtherVertex.length);
        Assert.assertEquals((long)expectedAfter, (long)blobWriter.numberOfBlobs());
    }

    private DefaultScheduler createSchedulerAndDeploy(JobID jobId, JobVertex v1, JobVertex v2, DistributionPattern distributionPattern, BlobWriter blobWriter) throws Exception {
        v2.connectNewDataSetAsInput(v1, distributionPattern, ResultPartitionType.BLOCKING);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
        DefaultScheduler scheduler = RemoveCachedShuffleDescriptorTest.createScheduler(jobId, ordered, blobWriter, this.mainThreadExecutor, this.ioExecutor);
        ExecutionGraph executionGraph = scheduler.getExecutionGraph();
        TestingLogicalSlotBuilder slotBuilder = new TestingLogicalSlotBuilder();
        CompletableFuture.runAsync(() -> {
            try {
                RemoveCachedShuffleDescriptorTest.deployTasks(executionGraph, v1.getID(), slotBuilder);
                RemoveCachedShuffleDescriptorTest.transitionTasksToFinished(executionGraph, v1.getID());
                RemoveCachedShuffleDescriptorTest.deployTasks(executionGraph, v2.getID(), slotBuilder);
            }
            catch (Exception e) {
                throw new RuntimeException("Exceptions shouldn't happen here.", e);
            }
        }, (Executor)this.mainThreadExecutor).join();
        return scheduler;
    }

    private void triggerGlobalFailoverAndComplete(DefaultScheduler scheduler, JobVertex upstream) throws TimeoutException {
        Exception t = new Exception();
        ExecutionGraph executionGraph = scheduler.getExecutionGraph();
        CompletableFuture.runAsync(() -> {
            scheduler.handleGlobalFailure(t);
            for (ExecutionVertex ev : executionGraph.getAllExecutionVertices()) {
                ev.getCurrentExecutionAttempt().completeCancelling();
            }
        }, (Executor)this.mainThreadExecutor).join();
        for (ExecutionVertex ev : Objects.requireNonNull(executionGraph.getJobVertex(upstream.getID())).getTaskVertices()) {
            ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, ExecutionState.DEPLOYING, 1000L);
        }
    }

    private void triggerExceptionAndComplete(ExecutionGraph executionGraph, JobVertex upstream, JobVertex downstream) throws TimeoutException {
        ExecutionVertex ev11 = Objects.requireNonNull(executionGraph.getJobVertex(upstream.getID())).getTaskVertices()[0];
        ExecutionVertex ev21 = Objects.requireNonNull(executionGraph.getJobVertex(downstream.getID())).getTaskVertices()[0];
        CompletableFuture.runAsync(() -> ev21.markFailed((Throwable)new PartitionNotFoundException(new ResultPartitionID())), (Executor)this.mainThreadExecutor).join();
        ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev11, ExecutionState.DEPLOYING, 1000L);
    }

    private static DefaultScheduler createScheduler(JobID jobId, List<JobVertex> jobVertices, BlobWriter blobWriter, ComponentMainThreadExecutor mainThreadExecutor, ScheduledExecutorService ioExecutor) throws Exception {
        JobGraph jobGraph = JobGraphBuilder.newBatchJobGraphBuilder().setJobId(jobId).addJobVertices(jobVertices).build();
        return SchedulerTestingUtils.newSchedulerBuilder(jobGraph, mainThreadExecutor).setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, 0L)).setBlobWriter(blobWriter).setIoExecutor(ioExecutor).build();
    }

    private static void deployTasks(ExecutionGraph executionGraph, JobVertexID jobVertexID, TestingLogicalSlotBuilder slotBuilder) throws JobException, ExecutionException, InterruptedException {
        for (ExecutionVertex vertex : Objects.requireNonNull(executionGraph.getJobVertex(jobVertexID)).getTaskVertices()) {
            TestingLogicalSlot slot = slotBuilder.createTestingLogicalSlot();
            Execution execution = vertex.getCurrentExecutionAttempt();
            execution.registerProducedPartitions(slot.getTaskManagerLocation(), true).get();
            vertex.tryAssignResource((LogicalSlot)slot);
            vertex.deploy();
        }
    }

    private static void transitionTasksToFinished(ExecutionGraph executionGraph, JobVertexID jobVertexID) {
        for (ExecutionVertex vertex : Objects.requireNonNull(executionGraph.getJobVertex(jobVertexID)).getTaskVertices()) {
            RemoveCachedShuffleDescriptorTest.transitionTaskToFinished(executionGraph, vertex);
        }
    }

    private static void transitionTaskToFinished(ExecutionGraph executionGraph, ExecutionVertex executionVertex) {
        executionGraph.updateState(new TaskExecutionStateTransition(new TaskExecutionState(executionVertex.getCurrentExecutionAttempt().getAttemptId(), ExecutionState.FINISHED)));
    }

    private static TaskDeploymentDescriptor.MaybeOffloaded<ShuffleDescriptor[]> getConsumedCachedShuffleDescriptor(ExecutionGraph executionGraph, JobVertex vertex) {
        return RemoveCachedShuffleDescriptorTest.getConsumedCachedShuffleDescriptor(executionGraph, vertex, 0);
    }

    private static TaskDeploymentDescriptor.MaybeOffloaded<ShuffleDescriptor[]> getConsumedCachedShuffleDescriptor(ExecutionGraph executionGraph, JobVertex vertex, int taskNum) {
        ExecutionJobVertex ejv = executionGraph.getJobVertex(vertex.getID());
        List consumedResults = Objects.requireNonNull(ejv).getInputs();
        IntermediateResult consumedResult = (IntermediateResult)consumedResults.get(0);
        return consumedResult.getCachedShuffleDescriptors(ejv.getTaskVertices()[taskNum].getConsumedPartitionGroup(0));
    }
}

