/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.TestingJobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlotProvider;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class ExecutionVertexTest {
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    ExecutionVertexTest() {
    }

    @Test
    void testResetForNewExecutionReleasesPartitions() throws Exception {
        JobVertex producerJobVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
        JobVertex consumerJobVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
        consumerJobVertex.connectNewDataSetAsInput(producerJobVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        CompletableFuture releasePartitionsFuture = new CompletableFuture();
        TestingJobMasterPartitionTracker partitionTracker = new TestingJobMasterPartitionTracker();
        partitionTracker.setStopTrackingAndReleasePartitionsConsumer(releasePartitionsFuture::complete);
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(producerJobVertex, consumerJobVertex);
        DefaultScheduler scheduler = new DefaultSchedulerBuilder(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setPartitionTracker(partitionTracker).build();
        scheduler.startScheduling();
        ExecutionJobVertex producerExecutionJobVertex = scheduler.getExecutionJobVertex(producerJobVertex.getID());
        Execution execution = producerExecutionJobVertex.getTaskVertices()[0].getCurrentExecutionAttempt();
        Assertions.assertThat(releasePartitionsFuture).isNotDone();
        execution.markFinished();
        Assertions.assertThat(releasePartitionsFuture).isNotDone();
        for (ExecutionVertex executionVertex : producerExecutionJobVertex.getTaskVertices()) {
            executionVertex.resetForNewExecution();
        }
        IntermediateResultPartitionID intermediateResultPartitionID = producerExecutionJobVertex.getProducedDataSets()[0].getPartitions()[0].getPartitionId();
        ResultPartitionID resultPartitionID = ((ResultPartitionDeploymentDescriptor)execution.getResultPartitionDeploymentDescriptor(intermediateResultPartitionID).get()).getShuffleDescriptor().getResultPartitionID();
        Assertions.assertThat((Collection)((Collection)releasePartitionsFuture.get())).contains((Object[])new ResultPartitionID[]{resultPartitionID});
    }

    @Test
    void testFindLatestAllocationIgnoresFailedAttempts() throws Exception {
        JobVertex source = ExecutionGraphTestUtils.createNoOpVertex(1);
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(source);
        TestingPhysicalSlotProvider withLimitedAmountOfPhysicalSlots = TestingPhysicalSlotProvider.createWithLimitedAmountOfPhysicalSlots(1);
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE, (Object)1);
        DefaultScheduler scheduler = new DefaultSchedulerBuilder(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setJobMasterConfiguration(configuration).setExecutionSlotAllocatorFactory((ExecutionSlotAllocatorFactory)SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(withLimitedAmountOfPhysicalSlots)).build();
        scheduler.startScheduling();
        ExecutionJobVertex sourceExecutionJobVertex = scheduler.getExecutionJobVertex(source.getID());
        ExecutionVertex sourceExecutionVertex = sourceExecutionJobVertex.getTaskVertices()[0];
        Execution firstExecution = sourceExecutionVertex.getCurrentExecutionAttempt();
        TestingPhysicalSlot physicalSlot = withLimitedAmountOfPhysicalSlots.getFirstResponseOrFail().join();
        AllocationID allocationId = physicalSlot.getAllocationId();
        TaskManagerLocation taskManagerLocation = physicalSlot.getTaskManagerLocation();
        this.cancelExecution(firstExecution);
        sourceExecutionVertex.resetForNewExecution();
        Assertions.assertThat((Optional)sourceExecutionVertex.findLastAllocation()).hasValue((Object)allocationId);
        Assertions.assertThat((Optional)sourceExecutionVertex.findLastLocation()).hasValue((Object)taskManagerLocation);
        Execution secondExecution = sourceExecutionVertex.getCurrentExecutionAttempt();
        this.cancelExecution(secondExecution);
        sourceExecutionVertex.resetForNewExecution();
        Assertions.assertThat((Optional)sourceExecutionVertex.findLastAllocation()).hasValue((Object)allocationId);
        Assertions.assertThat((Optional)sourceExecutionVertex.findLastLocation()).hasValue((Object)taskManagerLocation);
    }

    private void cancelExecution(Execution execution) {
        execution.cancel();
        execution.completeCancelling();
    }
}

