/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.TestingJobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.DefaultExecutionDeployer;
import org.apache.flink.runtime.scheduler.ExecutionDeployer;
import org.apache.flink.runtime.scheduler.ExecutionOperations;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocator;
import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
import org.apache.flink.runtime.scheduler.InternalFailuresListener;
import org.apache.flink.runtime.scheduler.TestExecutionOperationsDecorator;
import org.apache.flink.runtime.scheduler.TestExecutionSlotAllocator;
import org.apache.flink.runtime.scheduler.TestingInternalFailuresListener;
import org.apache.flink.runtime.shuffle.TestingShuffleMaster;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.JobVertexConnectionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.IterableUtils;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.LoggerFactory;

class DefaultExecutionDeployerTest {
    private ScheduledExecutorService executor;
    private ComponentMainThreadExecutor mainThreadExecutor;
    private TestExecutionOperationsDecorator testExecutionOperations;
    private ExecutionVertexVersioner executionVertexVersioner;
    private TestExecutionSlotAllocator testExecutionSlotAllocator;
    private TestingShuffleMaster shuffleMaster;
    private TestingJobMasterPartitionTracker partitionTracker;
    private Duration partitionRegistrationTimeout;

    DefaultExecutionDeployerTest() {
    }

    @BeforeEach
    void setUp() {
        this.executor = Executors.newSingleThreadScheduledExecutor();
        this.mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
        this.testExecutionOperations = new TestExecutionOperationsDecorator(new ExecutionOperations(){

            public void deploy(Execution execution) {
            }

            public CompletableFuture<?> cancel(Execution execution) {
                return null;
            }

            public void markFailed(Execution execution, Throwable cause) {
            }
        });
        this.executionVertexVersioner = new ExecutionVertexVersioner();
        this.testExecutionSlotAllocator = new TestExecutionSlotAllocator();
        this.shuffleMaster = new TestingShuffleMaster();
        this.partitionTracker = new TestingJobMasterPartitionTracker();
        this.partitionRegistrationTimeout = Duration.ofMillis(5000L);
    }

    @AfterEach
    void tearDown() {
        if (this.executor != null) {
            ExecutorUtils.gracefulShutdown((long)5L, (TimeUnit)TimeUnit.SECONDS, (ExecutorService[])new ExecutorService[]{this.executor});
        }
    }

    @Test
    void testDeployTasks() throws Exception {
        JobGraph jobGraph = DefaultExecutionDeployerTest.singleNonParallelJobVertexJobGraph();
        ExecutionGraph executionGraph = this.createExecutionGraph(jobGraph);
        ExecutionDeployer executionDeployer = this.createExecutionDeployer();
        this.deployTasks(executionDeployer, executionGraph);
        Assertions.assertThat(this.testExecutionOperations.getDeployedExecutions()).containsExactly((Object[])new ExecutionAttemptID[]{DefaultExecutionDeployerTest.getAnyExecution(executionGraph).getAttemptId()});
    }

    @Test
    void testDeployTasksOnlyIfAllSlotRequestsAreFulfilled() throws Exception {
        JobGraph jobGraph = DefaultExecutionDeployerTest.singleJobVertexJobGraph(4);
        ExecutionGraph executionGraph = this.createExecutionGraph(jobGraph);
        ExecutionDeployer executionDeployer = this.createExecutionDeployer();
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        this.deployTasks(executionDeployer, executionGraph);
        Assertions.assertThat(this.testExecutionOperations.getDeployedExecutions()).isEmpty();
        ExecutionAttemptID attemptId = DefaultExecutionDeployerTest.getAnyExecution(executionGraph).getAttemptId();
        this.testExecutionSlotAllocator.completePendingRequest(attemptId);
        Assertions.assertThat(this.testExecutionOperations.getDeployedExecutions()).isEmpty();
        this.testExecutionSlotAllocator.completePendingRequests();
        Assertions.assertThat(this.testExecutionOperations.getDeployedExecutions()).hasSize(4);
    }

    @Test
    void testDeploymentFailures() throws Exception {
        JobGraph jobGraph = DefaultExecutionDeployerTest.singleNonParallelJobVertexJobGraph();
        this.testExecutionOperations.enableFailDeploy();
        ExecutionGraph executionGraph = this.createExecutionGraph(jobGraph);
        ExecutionDeployer executionDeployer = this.createExecutionDeployer();
        this.deployTasks(executionDeployer, executionGraph);
        Assertions.assertThat(this.testExecutionOperations.getFailedExecutions()).containsExactly((Object[])new ExecutionAttemptID[]{DefaultExecutionDeployerTest.getAnyExecution(executionGraph).getAttemptId()});
    }

    @Test
    void testSlotAllocationTimeout() throws Exception {
        JobGraph jobGraph = DefaultExecutionDeployerTest.singleJobVertexJobGraph(2);
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        ExecutionGraph executionGraph = this.createExecutionGraph(jobGraph);
        ExecutionDeployer executionDeployer = this.createExecutionDeployer();
        this.deployTasks(executionDeployer, executionGraph);
        Assertions.assertThat(this.testExecutionSlotAllocator.getPendingRequests()).hasSize(2);
        ExecutionAttemptID attemptId = DefaultExecutionDeployerTest.getAnyExecution(executionGraph).getAttemptId();
        this.testExecutionSlotAllocator.timeoutPendingRequest(attemptId);
        Assertions.assertThat(this.testExecutionOperations.getFailedExecutions()).containsExactly((Object[])new ExecutionAttemptID[]{attemptId});
    }

    @Test
    void testSkipDeploymentIfVertexVersionOutdated() throws Exception {
        JobGraph jobGraph = DefaultExecutionDeployerTest.singleNonParallelJobVertexJobGraph();
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        ExecutionGraph executionGraph = this.createExecutionGraph(jobGraph);
        ExecutionDeployer executionDeployer = this.createExecutionDeployer();
        this.deployTasks(executionDeployer, executionGraph);
        ExecutionAttemptID attemptId = DefaultExecutionDeployerTest.getAnyExecution(executionGraph).getAttemptId();
        this.executionVertexVersioner.recordModification(attemptId.getExecutionVertexId());
        this.testExecutionSlotAllocator.completePendingRequests();
        Assertions.assertThat(this.testExecutionOperations.getDeployedVertices()).isEmpty();
    }

    @Test
    void testReleaseSlotIfVertexVersionOutdated() throws Exception {
        JobGraph jobGraph = DefaultExecutionDeployerTest.singleNonParallelJobVertexJobGraph();
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        ExecutionGraph executionGraph = this.createExecutionGraph(jobGraph);
        ExecutionDeployer executionDeployer = this.createExecutionDeployer();
        this.deployTasks(executionDeployer, executionGraph);
        ExecutionAttemptID attemptId = DefaultExecutionDeployerTest.getAnyExecution(executionGraph).getAttemptId();
        this.executionVertexVersioner.recordModification(attemptId.getExecutionVertexId());
        this.testExecutionSlotAllocator.completePendingRequests();
        Assertions.assertThat(this.testExecutionSlotAllocator.getReturnedSlots()).hasSize(1);
    }

    @Test
    void testDeployOnlyIfVertexIsCreated() throws Exception {
        JobGraph jobGraph = DefaultExecutionDeployerTest.singleNonParallelJobVertexJobGraph();
        ExecutionGraph executionGraph = this.createExecutionGraph(jobGraph);
        ExecutionDeployer executionDeployer = this.createExecutionDeployer();
        this.deployTasks(executionDeployer, executionGraph);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.deployTasks(executionDeployer, executionGraph)).as("IllegalStateException should happen", new Object[0])).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testDeploymentWaitForProducedPartitionRegistration() throws Exception {
        this.shuffleMaster.setAutoCompleteRegistration(false);
        ArrayList trackedPartitions = new ArrayList();
        this.partitionTracker.setStartTrackingPartitionsConsumer((resourceID, resultPartitionDeploymentDescriptor) -> trackedPartitions.add(resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID()));
        JobGraph jobGraph = DefaultExecutionDeployerTest.nonParallelSourceSinkJobGraph();
        ExecutionGraph executionGraph = this.createExecutionGraph(jobGraph);
        ExecutionDeployer executionDeployer = this.createExecutionDeployer();
        this.deployTasks(executionDeployer, executionGraph);
        Assertions.assertThat(trackedPartitions).isEmpty();
        Assertions.assertThat(this.testExecutionOperations.getDeployedExecutions()).isEmpty();
        this.shuffleMaster.completeAllPendingRegistrations();
        Assertions.assertThat(trackedPartitions).hasSize(1);
        Assertions.assertThat(this.testExecutionOperations.getDeployedExecutions()).hasSize(2);
    }

    @Test
    void testFailedProducedPartitionRegistration() throws Exception {
        this.shuffleMaster.setAutoCompleteRegistration(false);
        JobGraph jobGraph = DefaultExecutionDeployerTest.nonParallelSourceSinkJobGraph();
        ExecutionGraph executionGraph = this.createExecutionGraph(jobGraph);
        ExecutionDeployer executionDeployer = this.createExecutionDeployer();
        this.deployTasks(executionDeployer, executionGraph);
        Assertions.assertThat(this.testExecutionOperations.getFailedExecutions()).isEmpty();
        this.shuffleMaster.failAllPendingRegistrations();
        Assertions.assertThat(this.testExecutionOperations.getFailedExecutions()).hasSize(1);
    }

    @Test
    void testDirectExceptionOnProducedPartitionRegistration() throws Exception {
        this.shuffleMaster.setThrowExceptionalOnRegistration(true);
        JobGraph jobGraph = DefaultExecutionDeployerTest.nonParallelSourceSinkJobGraph();
        ExecutionGraph executionGraph = this.createExecutionGraph(jobGraph);
        ExecutionDeployer executionDeployer = this.createExecutionDeployer();
        this.deployTasks(executionDeployer, executionGraph);
        Assertions.assertThat(this.testExecutionOperations.getFailedExecutions()).hasSize(1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testProducedPartitionRegistrationTimeout() throws Exception {
        ScheduledExecutorService scheduledExecutorService = null;
        try {
            this.partitionRegistrationTimeout = Duration.ofMillis(1L);
            scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
            this.mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(scheduledExecutorService);
            this.shuffleMaster.setAutoCompleteRegistration(false);
            JobGraph jobGraph = DefaultExecutionDeployerTest.nonParallelSourceSinkJobGraph();
            ExecutionGraph executionGraph = this.createExecutionGraph(jobGraph);
            ExecutionDeployer executionDeployer = this.createExecutionDeployer();
            this.deployTasks(executionDeployer, executionGraph);
            this.testExecutionOperations.awaitFailedExecutions(1);
        }
        finally {
            if (scheduledExecutorService != null) {
                scheduledExecutorService.shutdown();
            }
        }
    }

    private static JobGraph singleNonParallelJobVertexJobGraph() {
        return DefaultExecutionDeployerTest.singleJobVertexJobGraph(1);
    }

    private static JobGraph singleJobVertexJobGraph(int parallelism) {
        JobVertex vertex = new JobVertex("source");
        vertex.setInvokableClass(NoOpInvokable.class);
        vertex.setParallelism(parallelism);
        return JobGraphTestUtils.streamingJobGraph(vertex);
    }

    private static JobGraph nonParallelSourceSinkJobGraph() {
        JobVertex source = new JobVertex("source");
        source.setInvokableClass(NoOpInvokable.class);
        JobVertex sink = new JobVertex("sink");
        sink.setInvokableClass(NoOpInvokable.class);
        JobVertexConnectionUtils.connectNewDataSetAsInput(sink, source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        return JobGraphTestUtils.streamingJobGraph(source, sink);
    }

    private ExecutionGraph createExecutionGraph(JobGraph jobGraph) throws Exception {
        DefaultExecutionGraph executionGraph = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).setShuffleMaster(this.shuffleMaster).setPartitionTracker(this.partitionTracker).build(this.executor);
        executionGraph.setInternalTaskFailuresListener((InternalFailuresListener)new TestingInternalFailuresListener());
        executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        return executionGraph;
    }

    private ExecutionDeployer createExecutionDeployer() {
        return new DefaultExecutionDeployer.Factory().createInstance(LoggerFactory.getLogger(DefaultExecutionDeployer.class), (ExecutionSlotAllocator)this.testExecutionSlotAllocator, (ExecutionOperations)this.testExecutionOperations, this.executionVertexVersioner, this.partitionRegistrationTimeout, (ignored1, ignored2) -> {}, this.mainThreadExecutor);
    }

    private void deployTasks(ExecutionDeployer executionDeployer, ExecutionGraph executionGraph) {
        this.deployTasks(executionDeployer, IterableUtils.toStream((Iterable)executionGraph.getAllExecutionVertices()).map(ExecutionVertex::getCurrentExecutionAttempt).collect(Collectors.toList()));
    }

    private void deployTasks(ExecutionDeployer executionDeployer, List<Execution> executions) {
        Set executionVertexIds = executions.stream().map(Execution::getAttemptId).map(ExecutionAttemptID::getExecutionVertexId).collect(Collectors.toSet());
        executionDeployer.allocateSlotsAndDeploy(executions, this.executionVertexVersioner.recordVertexModifications(executionVertexIds));
    }

    private static Execution getAnyExecution(ExecutionGraph executionGraph) {
        return (Execution)executionGraph.getRegisteredExecutions().values().iterator().next();
    }
}

