/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Predicate;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.failover.FixedDelayRestartBackoffTimeStrategy;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlotProvider;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.JobVertexConnectionUtils;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.FlinkException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class ExecutionGraphCoLocationRestartTest {
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    private static final int NUM_TASKS = 31;

    ExecutionGraphCoLocationRestartTest() {
    }

    @Test
    void testConstraintsAfterRestart() throws Exception {
        long timeout = 5000L;
        JobVertex groupVertex = ExecutionGraphTestUtils.createNoOpVertex(31);
        JobVertex groupVertex2 = ExecutionGraphTestUtils.createNoOpVertex(31);
        JobVertexConnectionUtils.connectNewDataSetAsInput(groupVertex2, groupVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        SlotSharingGroup sharingGroup = new SlotSharingGroup();
        groupVertex.setSlotSharingGroup(sharingGroup);
        groupVertex2.setSlotSharingGroup(sharingGroup);
        groupVertex.setStrictlyCoLocatedWith(groupVertex2);
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(groupVertex, groupVertex2);
        ManuallyTriggeredScheduledExecutorService delayExecutor = new ManuallyTriggeredScheduledExecutorService();
        DefaultScheduler scheduler = new DefaultSchedulerBuilder(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory((ExecutionSlotAllocatorFactory)SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(TestingPhysicalSlotProvider.create(ignored -> CompletableFuture.completedFuture(TestingPhysicalSlot.builder().build())))).setDelayExecutor(delayExecutor).setRestartBackoffTimeStrategy(new FixedDelayRestartBackoffTimeStrategy.FixedDelayRestartBackoffTimeStrategyFactory(1, 0L).create()).build();
        ExecutionGraph eg = scheduler.getExecutionGraph();
        Assertions.assertThat((Comparable)eg.getState()).isEqualTo((Object)JobStatus.CREATED);
        scheduler.startScheduling();
        Predicate<AccessExecution> isDeploying = ExecutionGraphTestUtils.isInExecutionState(ExecutionState.DEPLOYING);
        ExecutionGraphTestUtils.waitForAllExecutionsPredicate(eg, isDeploying, 5000L);
        Assertions.assertThat((Comparable)eg.getState()).isEqualTo((Object)JobStatus.RUNNING);
        this.validateConstraints(eg);
        ((ExecutionVertex)eg.getAllExecutionVertices().iterator().next()).fail((Throwable)new FlinkException("Test exception"));
        Assertions.assertThat((Comparable)eg.getState()).isEqualTo((Object)JobStatus.RESTARTING);
        delayExecutor.triggerNonPeriodicScheduledTask();
        for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
            if (vertex.getExecutionState() != ExecutionState.CANCELING) continue;
            vertex.getCurrentExecutionAttempt().completeCancelling();
        }
        ExecutionGraphTestUtils.waitUntilJobStatus(eg, JobStatus.RUNNING, 5000L);
        ExecutionGraphTestUtils.waitForAllExecutionsPredicate(eg, isDeploying, 5000L);
        this.validateConstraints(eg);
        ExecutionGraphTestUtils.finishAllVertices(eg);
        Assertions.assertThat((Comparable)eg.getState()).isEqualTo((Object)JobStatus.FINISHED);
    }

    private void validateConstraints(ExecutionGraph eg) {
        ExecutionJobVertex[] tasks = eg.getAllVertices().values().toArray(new ExecutionJobVertex[2]);
        for (int i = 0; i < 31; ++i) {
            TaskManagerLocation taskManagerLocation0 = tasks[0].getTaskVertices()[i].getCurrentAssignedResourceLocation();
            TaskManagerLocation taskManagerLocation1 = tasks[1].getTaskVertices()[i].getCurrentAssignedResourceLocation();
            Assertions.assertThat((Comparable)taskManagerLocation0).isEqualTo((Object)taskManagerLocation1);
        }
    }
}

