/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.TestingJobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlotProvider;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.ProducerDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.shuffle.ShuffleTestUtils;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

public class ExecutionPartitionLifecycleTest
extends TestLogger {
    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorResource();
    private Execution execution;
    private ResultPartitionDeploymentDescriptor descriptor;
    private ResourceID taskExecutorResourceId;
    private JobID jobId;

    @Test
    public void testPartitionReleaseOnFinishWhileCanceling() throws Exception {
        this.testPartitionReleaseOnStateTransitionsAfterRunning(Execution::cancel, Execution::markFinished);
    }

    @Test
    public void testPartitionReleaseOnCancelWhileFinished() throws Exception {
        this.testPartitionReleaseOnStateTransitionsAfterRunning(Execution::markFinished, Execution::cancel);
    }

    @Test
    public void testPartitionReleaseOnSuspendWhileFinished() throws Exception {
        this.testPartitionReleaseOnStateTransitionsAfterRunning(Execution::markFinished, Execution::suspend);
    }

    private void testPartitionReleaseOnStateTransitionsAfterRunning(Consumer<Execution> stateTransition1, Consumer<Execution> stateTransition2) throws Exception {
        SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
        CompletableFuture releasePartitionsCallFuture = new CompletableFuture();
        taskManagerGateway.setReleasePartitionsConsumer((jobID, partitionIds) -> releasePartitionsCallFuture.complete(Tuple2.of((Object)jobID, (Object)partitionIds)));
        TestingShuffleMaster testingShuffleMaster = new TestingShuffleMaster();
        this.setupExecutionGraphAndStartRunningJob(ResultPartitionType.PIPELINED, NoOpJobMasterPartitionTracker.INSTANCE, taskManagerGateway, testingShuffleMaster);
        stateTransition1.accept(this.execution);
        Assert.assertFalse((boolean)releasePartitionsCallFuture.isDone());
        stateTransition2.accept(this.execution);
        Assert.assertTrue((boolean)releasePartitionsCallFuture.isDone());
        Tuple2 releasePartitionsCall = (Tuple2)releasePartitionsCallFuture.get();
        Assert.assertEquals((Object)this.jobId, (Object)releasePartitionsCall.f0);
        Assert.assertThat((Object)releasePartitionsCall.f1, (Matcher)Matchers.contains((Object[])new ResultPartitionID[]{this.descriptor.getShuffleDescriptor().getResultPartitionID()}));
        Assert.assertEquals((long)1L, (long)testingShuffleMaster.externallyReleasedPartitions.size());
        Assert.assertEquals((Object)this.descriptor.getShuffleDescriptor(), (Object)testingShuffleMaster.externallyReleasedPartitions.poll());
    }

    @Test
    public void testPartitionTrackedAndNotReleasedWhenFinished() throws Exception {
        this.testPartitionTrackingForStateTransition(Execution::markFinished, PartitionReleaseResult.NONE);
    }

    @Test
    public void testPartitionNotTrackedAndNotReleasedWhenCanceledByTM() throws Exception {
        this.testPartitionTrackingForStateTransition(execution -> {
            execution.cancel();
            execution.completeCancelling(Collections.emptyMap(), new IOMetrics(0L, 0L, 0L, 0L, 0L, 0L, 0L), false);
        }, PartitionReleaseResult.STOP_TRACKING);
    }

    @Test
    public void testPartitionNotTrackedAndReleasedWhenCanceledByJM() throws Exception {
        this.testPartitionTrackingForStateTransition(execution -> {
            execution.cancel();
            execution.completeCancelling();
        }, PartitionReleaseResult.STOP_TRACKING_AND_RELEASE);
    }

    @Test
    public void testPartitionNotTrackedAndNotReleasedWhenFailedByTM() throws Exception {
        this.testPartitionTrackingForStateTransition(execution -> execution.markFailed((Throwable)new Exception("Test exception"), false, Collections.emptyMap(), new IOMetrics(0L, 0L, 0L, 0L, 0L, 0L, 0L), false, true), PartitionReleaseResult.STOP_TRACKING);
    }

    @Test
    public void testPartitionNotTrackedAndReleasedWhenFailedByJM() throws Exception {
        this.testPartitionTrackingForStateTransition(execution -> execution.markFailed((Throwable)new Exception("Test exception")), PartitionReleaseResult.STOP_TRACKING_AND_RELEASE);
    }

    private void testPartitionTrackingForStateTransition(Consumer<Execution> stateTransition, PartitionReleaseResult partitionReleaseResult) throws Exception {
        CompletableFuture partitionStartTrackingFuture = new CompletableFuture();
        CompletableFuture partitionStopTrackingFuture = new CompletableFuture();
        CompletableFuture partitionStopTrackingAndReleaseFuture = new CompletableFuture();
        TestingJobMasterPartitionTracker partitionTracker = new TestingJobMasterPartitionTracker();
        partitionTracker.setStartTrackingPartitionsConsumer((resourceID, resultPartitionDeploymentDescriptor) -> partitionStartTrackingFuture.complete(Tuple2.of((Object)resourceID, (Object)resultPartitionDeploymentDescriptor)));
        partitionTracker.setStopTrackingPartitionsConsumer(partitionStopTrackingFuture::complete);
        partitionTracker.setStopTrackingAndReleasePartitionsConsumer(partitionStopTrackingAndReleaseFuture::complete);
        this.setupExecutionGraphAndStartRunningJob(ResultPartitionType.BLOCKING, partitionTracker, new SimpleAckingTaskManagerGateway(), ShuffleTestUtils.DEFAULT_SHUFFLE_MASTER);
        Tuple2 startTrackingCall = (Tuple2)partitionStartTrackingFuture.get();
        Assert.assertThat((Object)startTrackingCall.f0, (Matcher)Matchers.equalTo((Object)this.taskExecutorResourceId));
        Assert.assertThat((Object)startTrackingCall.f1, (Matcher)Matchers.equalTo((Object)this.descriptor));
        stateTransition.accept(this.execution);
        switch (partitionReleaseResult) {
            case NONE: {
                Assert.assertFalse((boolean)partitionStopTrackingFuture.isDone());
                Assert.assertFalse((boolean)partitionStopTrackingAndReleaseFuture.isDone());
                break;
            }
            case STOP_TRACKING: {
                Assert.assertTrue((boolean)partitionStopTrackingFuture.isDone());
                Assert.assertFalse((boolean)partitionStopTrackingAndReleaseFuture.isDone());
                Collection stopTrackingCall = (Collection)partitionStopTrackingFuture.get();
                Assert.assertEquals(Collections.singletonList(this.descriptor.getShuffleDescriptor().getResultPartitionID()), (Object)stopTrackingCall);
                break;
            }
            case STOP_TRACKING_AND_RELEASE: {
                Assert.assertFalse((boolean)partitionStopTrackingFuture.isDone());
                Assert.assertTrue((boolean)partitionStopTrackingAndReleaseFuture.isDone());
                Collection stopTrackingAndReleaseCall = (Collection)partitionStopTrackingAndReleaseFuture.get();
                Assert.assertEquals(Collections.singletonList(this.descriptor.getShuffleDescriptor().getResultPartitionID()), (Object)stopTrackingAndReleaseCall);
            }
        }
    }

    private void setupExecutionGraphAndStartRunningJob(ResultPartitionType resultPartitionType, JobMasterPartitionTracker partitionTracker, TaskManagerGateway taskManagerGateway, ShuffleMaster<?> shuffleMaster) throws Exception {
        JobVertex producerVertex = this.createNoOpJobVertex();
        JobVertex consumerVertex = this.createNoOpJobVertex();
        consumerVertex.connectNewDataSetAsInput(producerVertex, DistributionPattern.ALL_TO_ALL, resultPartitionType);
        LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
        TestingPhysicalSlotProvider physicalSlotProvider = TestingPhysicalSlotProvider.create(resourceProfile -> CompletableFuture.completedFuture(TestingPhysicalSlot.builder().withTaskManagerGateway(taskManagerGateway).withTaskManagerLocation(taskManagerLocation).build()));
        JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(producerVertex, consumerVertex);
        DefaultScheduler scheduler = new DefaultSchedulerBuilder(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory((ExecutionSlotAllocatorFactory)SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(physicalSlotProvider)).setShuffleMaster(shuffleMaster).setPartitionTracker(partitionTracker).build();
        ExecutionGraph executionGraph = scheduler.getExecutionGraph();
        ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(producerVertex.getID());
        ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
        this.execution = executionVertex.getCurrentExecutionAttempt();
        scheduler.startScheduling();
        this.execution.switchToRecovering();
        this.execution.switchToRunning();
        IntermediateResultPartitionID expectedIntermediateResultPartitionId = executionJobVertex.getProducedDataSets()[0].getPartitions()[0].getPartitionId();
        this.descriptor = (ResultPartitionDeploymentDescriptor)this.execution.getResultPartitionDeploymentDescriptor(expectedIntermediateResultPartitionId).get();
        this.taskExecutorResourceId = taskManagerLocation.getResourceID();
        this.jobId = executionGraph.getJobID();
    }

    @Nonnull
    private JobVertex createNoOpJobVertex() {
        JobVertex jobVertex = new JobVertex("Test vertex", new JobVertexID());
        jobVertex.setInvokableClass(NoOpInvokable.class);
        return jobVertex;
    }

    private static class TestingShuffleDescriptor
    implements ShuffleDescriptor {
        private static final long serialVersionUID = 1819950291216655728L;
        private final ExecutionAttemptID producerExecutionId;
        private final IntermediateResultPartitionID producedPartitionId;
        private final ResourceID producerLocation;

        TestingShuffleDescriptor(IntermediateResultPartitionID producedPartitionId, ExecutionAttemptID producerExecutionId, ResourceID producerLocation) {
            this.producedPartitionId = producedPartitionId;
            this.producerExecutionId = producerExecutionId;
            this.producerLocation = producerLocation;
        }

        public ResultPartitionID getResultPartitionID() {
            return new ResultPartitionID(this.producedPartitionId, this.producerExecutionId);
        }

        public Optional<ResourceID> storesLocalResourcesOn() {
            return Optional.of(this.producerLocation);
        }
    }

    private static class TestingShuffleMaster
    implements ShuffleMaster<ShuffleDescriptor> {
        final Queue<ShuffleDescriptor> externallyReleasedPartitions = new ArrayBlockingQueue<ShuffleDescriptor>(4);

        private TestingShuffleMaster() {
        }

        public CompletableFuture<ShuffleDescriptor> registerPartitionWithProducer(JobID jobID, PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) {
            return CompletableFuture.completedFuture(new TestingShuffleDescriptor(partitionDescriptor.getPartitionId(), producerDescriptor.getProducerExecutionId(), producerDescriptor.getProducerLocation()));
        }

        public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) {
            this.externallyReleasedPartitions.add(shuffleDescriptor);
        }
    }

    private static enum PartitionReleaseResult {
        NONE,
        STOP_TRACKING,
        STOP_TRACKING_AND_RELEASE;

    }
}

