/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class ExecutionGraphFinishTest {
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    ExecutionGraphFinishTest() {
    }

    @Test
    void testJobFinishes() throws Exception {
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(ExecutionGraphTestUtils.createJobVertex("Task1", 2, NoOpInvokable.class), ExecutionGraphTestUtils.createJobVertex("Task2", 2, NoOpInvokable.class));
        DefaultScheduler scheduler = new DefaultSchedulerBuilder(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).build();
        ExecutionGraph eg = scheduler.getExecutionGraph();
        scheduler.startScheduling();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
        Iterator jobVertices = eg.getVerticesTopologically().iterator();
        ExecutionJobVertex sender = (ExecutionJobVertex)jobVertices.next();
        ExecutionJobVertex receiver = (ExecutionJobVertex)jobVertices.next();
        List<ExecutionVertex> senderVertices = Arrays.asList(sender.getTaskVertices());
        List<ExecutionVertex> receiverVertices = Arrays.asList(receiver.getTaskVertices());
        senderVertices.get(0).getCurrentExecutionAttempt().markFinished();
        Assertions.assertThat((int)sender.getNumExecutionVertexFinished()).isOne();
        Assertions.assertThat((Comparable)eg.getState()).isEqualTo((Object)JobStatus.RUNNING);
        senderVertices.get(1).getCurrentExecutionAttempt().markFinished();
        Assertions.assertThat((int)sender.getNumExecutionVertexFinished()).isEqualTo(2);
        Assertions.assertThat((Comparable)eg.getState()).isEqualTo((Object)JobStatus.RUNNING);
        receiverVertices.get(0).getCurrentExecutionAttempt().markFinished();
        receiverVertices.get(1).getCurrentExecutionAttempt().markFinished();
        Assertions.assertThat((int)eg.getNumFinishedVertices()).isEqualTo(4);
        Assertions.assertThat((Comparable)eg.getState()).isEqualTo((Object)JobStatus.FINISHED);
    }
}

