/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster;

import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.FlinkCompletableFutureAssert;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.highavailability.JobResultEntry;
import org.apache.flink.runtime.highavailability.JobResultStore;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedJobResultStore;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerRunnerResult;
import org.apache.flink.runtime.jobmaster.JobMasterServiceLeadershipRunner;
import org.apache.flink.runtime.jobmaster.JobMasterServiceProcess;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.jobmaster.TestingJobMasterServiceProcess;
import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceProcessFactory;
import org.apache.flink.runtime.jobmaster.factories.TestingJobMasterServiceProcessFactory;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService;
import org.apache.flink.runtime.leaderelection.LeaderElection;
import org.apache.flink.runtime.leaderelection.LeaderElectionDriverFactory;
import org.apache.flink.runtime.leaderelection.LeaderInformation;
import org.apache.flink.runtime.leaderelection.LeaderInformationRegister;
import org.apache.flink.runtime.leaderelection.TestingLeaderElection;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionDriver;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.TestingJobResultStore;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.CompletableFutureAssert;
import org.assertj.core.api.ObjectAssert;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class JobMasterServiceLeadershipRunnerTest {
    private static final Duration TESTING_TIMEOUT = Duration.ofSeconds(10L);
    private static JobGraph jobGraph;
    private TestingLeaderElection leaderElection;
    private TestingFatalErrorHandler fatalErrorHandler;
    private JobResultStore jobResultStore;

    JobMasterServiceLeadershipRunnerTest() {
    }

    @BeforeAll
    static void setupClass() {
        JobVertex jobVertex = new JobVertex("Test vertex");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex);
    }

    @BeforeEach
    void setup() {
        this.leaderElection = new TestingLeaderElection();
        this.jobResultStore = new EmbeddedJobResultStore();
        this.fatalErrorHandler = new TestingFatalErrorHandler();
    }

    @AfterEach
    void tearDown() throws Exception {
        this.leaderElection.close();
        this.fatalErrorHandler.rethrowError();
    }

    @Test
    void testShutDownSignalsJobAsNotFinished() throws Exception {
        try (JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().build();){
            jobManagerRunner.start();
            CompletableFuture resultFuture = jobManagerRunner.getResultFuture();
            Assertions.assertThat((CompletableFuture)resultFuture).isNotDone();
            jobManagerRunner.closeAsync();
            this.assertJobNotFinished(resultFuture);
            FlinkAssertions.assertThatFuture((CompletableFuture)jobManagerRunner.getJobMasterGateway()).eventuallyFails();
        }
    }

    @Test
    void testCloseReleasesClassLoaderLease() throws Exception {
        OneShotLatch closeClassLoaderLeaseLatch = new OneShotLatch();
        TestingClassLoaderLease classLoaderLease = TestingClassLoaderLease.newBuilder().setCloseRunnable(() -> ((OneShotLatch)closeClassLoaderLeaseLatch).trigger()).build();
        try (JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().setClassLoaderLease(classLoaderLease).build();){
            jobManagerRunner.start();
            jobManagerRunner.close();
            closeClassLoaderLeaseLatch.await();
        }
    }

    @Test
    void testConcurrentLeadershipOperationsBlockingClose() throws Exception {
        CompletableFuture<Object> terminationFuture = new CompletableFuture<Object>();
        JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().withJobMasterServiceProcesses(TestingJobMasterServiceProcess.newBuilder().setCloseAsyncSupplier(() -> terminationFuture).build(), TestingJobMasterServiceProcess.newBuilder().build()).build();
        jobManagerRunner.start();
        this.leaderElection.isLeader(UUID.randomUUID()).get();
        this.leaderElection.notLeader();
        UUID expectedLeaderSessionID = UUID.randomUUID();
        CompletableFuture<LeaderInformation> confirmedLeaderInformation = this.leaderElection.isLeader(expectedLeaderSessionID);
        ((FlinkCompletableFutureAssert)FlinkAssertions.assertThatFuture(confirmedLeaderInformation).as("The new leadership should wait first for the suspension to happen.", new Object[0])).willNotCompleteWithin(Duration.ofMillis(1L));
        terminationFuture.complete(null);
        FlinkAssertions.assertThatFuture(confirmedLeaderInformation).eventuallySucceeds().extracting(LeaderInformation::getLeaderSessionID).isEqualTo((Object)expectedLeaderSessionID);
    }

    @Test
    void testExceptionallyCompletedResultFutureFromJobMasterServiceProcessIsForwarded() throws Exception {
        CompletableFuture resultFuture = new CompletableFuture();
        TestingJobMasterServiceProcess testingJobMasterServiceProcess = TestingJobMasterServiceProcess.newBuilder().setGetResultFutureSupplier(() -> resultFuture).build();
        JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().withSingleJobMasterServiceProcess(testingJobMasterServiceProcess).build();
        jobManagerRunner.start();
        this.leaderElection.isLeader(UUID.randomUUID()).get();
        FlinkException cause = new FlinkException("The JobMasterService failed unexpectedly.");
        resultFuture.completeExceptionally((Throwable)cause);
        FlinkAssertions.assertThatFuture((CompletableFuture)jobManagerRunner.getResultFuture()).eventuallyFailsWith(ExecutionException.class).withCause((Throwable)cause);
    }

    @Test
    void testJobMasterCreationFailureCompletesJobManagerRunnerWithInitializationError() throws Exception {
        FlinkException testException = new FlinkException("Test exception");
        CompletableFuture<JobManagerRunnerResult> completedResultFuture = CompletableFuture.completedFuture(JobManagerRunnerResult.forInitializationFailure((ExecutionGraphInfo)this.createFailedExecutionGraphInfo(testException), (Throwable)testException));
        JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().withSingleJobMasterServiceProcess(TestingJobMasterServiceProcess.newBuilder().setGetResultFutureSupplier(() -> completedResultFuture).build()).build();
        jobManagerRunner.start();
        this.leaderElection.isLeader(UUID.randomUUID());
        JobManagerRunnerResult jobManagerRunnerResult = (JobManagerRunnerResult)jobManagerRunner.getResultFuture().join();
        Assertions.assertThat((boolean)jobManagerRunnerResult.isInitializationFailure()).isTrue();
        Assertions.assertThat((Throwable)jobManagerRunnerResult.getInitializationFailure()).isEqualTo((Object)testException);
    }

    @Nonnull
    private ExecutionGraphInfo createFailedExecutionGraphInfo(FlinkException testException) {
        return new ExecutionGraphInfo(ArchivedExecutionGraph.createSparseArchivedExecutionGraph((JobID)jobGraph.getJobID(), (String)jobGraph.getName(), (JobStatus)JobStatus.FAILED, (JobType)jobGraph.getJobType(), (Throwable)testException, null, (long)1L));
    }

    @Test
    void testJobMasterServiceProcessIsTerminatedOnClose() throws Exception {
        CompletableFuture terminationFuture = new CompletableFuture();
        JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().withSingleJobMasterServiceProcess(TestingJobMasterServiceProcess.newBuilder().setCloseAsyncSupplier(() -> {
            terminationFuture.complete(null);
            return terminationFuture;
        }).build()).build();
        jobManagerRunner.start();
        this.leaderElection.isLeader(UUID.randomUUID());
        jobManagerRunner.closeAsync().join();
        this.assertJobNotFinished(jobManagerRunner.getResultFuture());
        Assertions.assertThat(terminationFuture).isDone();
    }

    @Test
    void testJobMasterServiceProcessShutdownOnLeadershipLoss() throws Exception {
        CompletableFuture terminationFuture = new CompletableFuture();
        JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().withSingleJobMasterServiceProcess(TestingJobMasterServiceProcess.newBuilder().setCloseAsyncSupplier(() -> {
            terminationFuture.complete(null);
            return terminationFuture;
        }).build()).build();
        jobManagerRunner.start();
        this.leaderElection.isLeader(UUID.randomUUID());
        this.leaderElection.notLeader();
        Assertions.assertThat(terminationFuture).isDone();
    }

    @Test
    void testCancellationIsForwardedToJobMasterService() throws Exception {
        CompletableFuture<TestingJobMasterGateway> jobMasterGatewayFuture = new CompletableFuture<TestingJobMasterGateway>();
        JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().withSingleJobMasterServiceProcess(TestingJobMasterServiceProcess.newBuilder().setGetJobMasterGatewayFutureSupplier(() -> jobMasterGatewayFuture).build()).build();
        jobManagerRunner.start();
        this.leaderElection.isLeader(UUID.randomUUID());
        CompletableFuture cancellationFuture = jobManagerRunner.cancel(TESTING_TIMEOUT);
        Assertions.assertThat((CompletableFuture)cancellationFuture).isNotDone();
        AtomicBoolean cancelCalled = new AtomicBoolean(false);
        TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().setCancelFunction(() -> {
            cancelCalled.set(true);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build();
        jobMasterGatewayFuture.complete(jobMasterGateway);
        cancellationFuture.get();
        Assertions.assertThat((AtomicBoolean)cancelCalled).isTrue();
    }

    @Test
    void testJobInformationOperationsDuringInitialization() throws Exception {
        JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().withSingleJobMasterServiceProcess(TestingJobMasterServiceProcess.newBuilder().setIsInitializedAndRunningSupplier(() -> false).build()).build();
        jobManagerRunner.start();
        JobMasterServiceLeadershipRunnerTest.assertInitializingStates((JobManagerRunner)jobManagerRunner);
        this.leaderElection.isLeader(UUID.randomUUID());
        JobMasterServiceLeadershipRunnerTest.assertInitializingStates((JobManagerRunner)jobManagerRunner);
    }

    private static void assertInitializingStates(JobManagerRunner jobManagerRunner) throws ExecutionException, InterruptedException {
        Assertions.assertThat((Comparable)((JobStatus)jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).get())).isEqualTo((Object)JobStatus.INITIALIZING);
        Assertions.assertThat((CompletableFuture)jobManagerRunner.getResultFuture()).isNotDone();
        Assertions.assertThat((Comparable)((ExecutionGraphInfo)jobManagerRunner.requestJob(TESTING_TIMEOUT).get()).getArchivedExecutionGraph().getState()).isEqualTo((Object)JobStatus.INITIALIZING);
        Assertions.assertThat((Comparable)((JobDetails)jobManagerRunner.requestJobDetails(TESTING_TIMEOUT).get()).getStatus()).isEqualTo((Object)JobStatus.INITIALIZING);
    }

    @Test
    void testSkippingOfEnqueuedLeadershipOperations() throws Exception {
        CompletableFuture<Object> firstTerminationFuture = new CompletableFuture<Object>();
        CompletableFuture secondTerminationFuture = new CompletableFuture();
        JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().withJobMasterServiceProcesses(TestingJobMasterServiceProcess.newBuilder().setCloseAsyncSupplier(() -> firstTerminationFuture).setIsInitializedAndRunningSupplier(() -> false).build(), TestingJobMasterServiceProcess.newBuilder().setCloseAsyncSupplier(() -> {
            secondTerminationFuture.complete(null);
            return secondTerminationFuture;
        }).build()).build();
        jobManagerRunner.start();
        this.leaderElection.isLeader(UUID.randomUUID());
        Assertions.assertThat((Comparable)((JobStatus)jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).get())).isEqualTo((Object)JobStatus.INITIALIZING);
        for (int i = 0; i < 10; ++i) {
            this.leaderElection.notLeader();
            this.leaderElection.isLeader(UUID.randomUUID());
        }
        firstTerminationFuture.complete(null);
        jobManagerRunner.closeAsync();
        Assertions.assertThat(secondTerminationFuture).isDone();
    }

    @Test
    void testCancellationFailsWhenInitializationFails() throws Exception {
        FlinkException testException = new FlinkException("test exception");
        this.runCancellationFailsTest(resultFuture -> resultFuture.complete(JobManagerRunnerResult.forInitializationFailure((ExecutionGraphInfo)this.createFailedExecutionGraphInfo(testException), (Throwable)testException)));
    }

    @Test
    void testCancellationFailsWhenExceptionOccurs() throws Exception {
        FlinkException testException = new FlinkException("test exception");
        this.runCancellationFailsTest(resultFuture -> resultFuture.completeExceptionally((Throwable)testException));
    }

    void runCancellationFailsTest(Consumer<CompletableFuture<JobManagerRunnerResult>> testAction) throws Exception {
        CompletableFuture jobManagerRunnerResultFuture = new CompletableFuture();
        JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().withSingleJobMasterServiceProcess(TestingJobMasterServiceProcess.newBuilder().setGetJobMasterGatewayFutureSupplier(CompletableFuture::new).setGetResultFutureSupplier(() -> jobManagerRunnerResultFuture).setIsInitializedAndRunningSupplier(() -> false).build()).build();
        jobManagerRunner.start();
        this.leaderElection.isLeader(UUID.randomUUID());
        Assertions.assertThat((Comparable)((JobStatus)jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).get())).isEqualTo((Object)JobStatus.INITIALIZING);
        CompletableFuture cancelFuture = jobManagerRunner.cancel(TESTING_TIMEOUT);
        Assertions.assertThat((CompletableFuture)cancelFuture).isNotDone();
        testAction.accept(jobManagerRunnerResultFuture);
        Assertions.assertThatThrownBy(cancelFuture::get).hasMessageContaining("Cancellation failed.");
    }

    @Test
    void testResultFutureCompletionOfOutdatedLeaderIsIgnored() throws Exception {
        CompletableFuture<JobManagerRunnerResult> resultFuture = new CompletableFuture<JobManagerRunnerResult>();
        JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().withSingleJobMasterServiceProcess(TestingJobMasterServiceProcess.newBuilder().setGetResultFutureSupplier(() -> resultFuture).build()).build();
        jobManagerRunner.start();
        this.leaderElection.isLeader(UUID.randomUUID()).join();
        this.leaderElection.notLeader();
        ((CompletableFutureAssert)Assertions.assertThat((CompletableFuture)jobManagerRunner.getResultFuture()).as("The runner result should not be completed by the leadership revocation.", new Object[0])).isNotDone();
        resultFuture.complete(JobManagerRunnerResult.forSuccess((ExecutionGraphInfo)this.createFailedExecutionGraphInfo(new FlinkException("test exception"))));
        ((CompletableFutureAssert)Assertions.assertThat((CompletableFuture)jobManagerRunner.getResultFuture()).as("The runner result should not be completed if the leadership is lost.", new Object[0])).isNotDone();
        jobManagerRunner.closeAsync().get();
        ((ObjectAssert)FlinkAssertions.assertThatFuture((CompletableFuture)jobManagerRunner.getResultFuture()).eventuallySucceeds().as("The runner result should be completed with a SUSPENDED job status if the job didn't finish when closing the runner, yet.", new Object[0])).satisfies(new ThrowingConsumer[]{result -> {
            Assertions.assertThat((boolean)result.isSuccess()).isTrue();
            Assertions.assertThat((Comparable)result.getExecutionGraphInfo().getArchivedExecutionGraph().getState()).isEqualTo((Object)JobStatus.SUSPENDED);
        }});
    }

    @Test
    void testJobMasterGatewayIsInvalidatedOnLeadershipChanges() throws Exception {
        JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().withSingleJobMasterServiceProcess(TestingJobMasterServiceProcess.newBuilder().setGetJobMasterGatewayFutureSupplier(CompletableFuture::new).build()).build();
        jobManagerRunner.start();
        CompletableFuture jobMasterGateway = jobManagerRunner.getJobMasterGateway();
        this.leaderElection.isLeader(UUID.randomUUID());
        this.leaderElection.notLeader();
        FlinkAssertions.assertThatFuture((CompletableFuture)jobMasterGateway).eventuallyFails();
    }

    @Test
    void testLeaderAddressOfOutdatedLeaderIsIgnored() throws Exception {
        CompletableFuture<String> leaderAddressFuture = new CompletableFuture<String>();
        JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().withSingleJobMasterServiceProcess(TestingJobMasterServiceProcess.newBuilder().setGetLeaderAddressFutureSupplier(() -> leaderAddressFuture).build()).build();
        jobManagerRunner.start();
        CompletableFuture<LeaderInformation> leaderFuture = this.leaderElection.isLeader(UUID.randomUUID());
        this.leaderElection.notLeader();
        leaderAddressFuture.complete("foobar");
        FlinkAssertions.assertThatFuture(leaderFuture).willNotCompleteWithin(Duration.ofMillis(5L));
    }

    @Test
    void testInitialJobStatusIsInitializing() throws Exception {
        JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().build();
        jobManagerRunner.start();
        Assertions.assertThat((Comparable)((JobStatus)jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).join())).isEqualTo((Object)JobStatus.INITIALIZING);
    }

    @Test
    void testCancellationChangesJobStatusToCancelling() throws Exception {
        JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().build();
        jobManagerRunner.start();
        jobManagerRunner.cancel(TESTING_TIMEOUT);
        Assertions.assertThat((Comparable)((JobStatus)jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).join())).isEqualTo((Object)JobStatus.CANCELLING);
    }

    @Test
    void testJobStatusCancellingIsClearedOnLeadershipLoss() throws Exception {
        CompletableFuture terminationFuture = new CompletableFuture();
        JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().withSingleJobMasterServiceProcess(TestingJobMasterServiceProcess.newBuilder().setCloseAsyncSupplier(() -> {
            terminationFuture.complete(null);
            return terminationFuture;
        }).setIsInitializedAndRunningSupplier(() -> !terminationFuture.isDone()).build()).build();
        jobManagerRunner.start();
        jobManagerRunner.cancel(TESTING_TIMEOUT);
        this.leaderElection.isLeader(UUID.randomUUID());
        this.leaderElection.notLeader();
        Assertions.assertThat((Comparable)((JobStatus)jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).join())).isEqualTo((Object)JobStatus.INITIALIZING);
    }

    @Test
    void testJobMasterServiceProcessClosingExceptionIsForwardedToResultFuture() throws Exception {
        CompletableFuture terminationFuture = new CompletableFuture();
        JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().withSingleJobMasterServiceProcess(TestingJobMasterServiceProcess.newBuilder().setCloseAsyncSupplier(() -> terminationFuture).build()).build();
        jobManagerRunner.start();
        this.leaderElection.isLeader(UUID.randomUUID());
        this.leaderElection.notLeader();
        FlinkException testException = new FlinkException("Test exception");
        terminationFuture.completeExceptionally((Throwable)testException);
        FlinkAssertions.assertThatFuture((CompletableFuture)jobManagerRunner.getResultFuture()).eventuallyFailsWith(ExecutionException.class).satisfies(new ThrowingConsumer[]{cause -> Assertions.assertThat((Throwable)cause).hasRootCause((Throwable)testException)});
    }

    @Test
    void testJobMasterServiceProcessCreationFailureIsForwardedToResultFuture() throws Exception {
        FlinkRuntimeException testException = new FlinkRuntimeException("Test exception");
        JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().setJobMasterServiceProcessFactory(TestingJobMasterServiceProcessFactory.newBuilder().setJobMasterServiceProcessFunction(ignored -> {
            throw testException;
        }).build()).build();
        jobManagerRunner.start();
        this.leaderElection.isLeader(UUID.randomUUID());
        FlinkAssertions.assertThatFuture((CompletableFuture)jobManagerRunner.getResultFuture()).eventuallyFailsWith(ExecutionException.class).satisfies(new ThrowingConsumer[]{cause -> Assertions.assertThat((Throwable)cause).hasRootCause((Throwable)testException)});
    }

    @Test
    void testJobAlreadyDone() throws Exception {
        JobID jobId = new JobID();
        JobResult jobResult = TestingJobResultStore.createJobResult(jobId, ApplicationStatus.UNKNOWN);
        this.jobResultStore.createDirtyResultAsync(new JobResultEntry(jobResult)).get();
        try (JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().setJobMasterServiceProcessFactory(TestingJobMasterServiceProcessFactory.newBuilder().setJobId(jobId).build()).build();){
            jobManagerRunner.start();
            this.leaderElection.isLeader(UUID.randomUUID());
            CompletableFuture resultFuture = jobManagerRunner.getResultFuture();
            JobManagerRunnerResult result = (JobManagerRunnerResult)resultFuture.get();
            Assertions.assertThat((Comparable)result.getExecutionGraphInfo().getArchivedExecutionGraph().getState()).isEqualTo((Object)JobStatus.FAILED);
        }
    }

    @Test
    void testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip() throws Exception {
        AtomicReference<LeaderInformationRegister> storedLeaderInformation = new AtomicReference<LeaderInformationRegister>(LeaderInformationRegister.empty());
        AtomicBoolean haBackendLeadershipFlag = new AtomicBoolean();
        TestingLeaderElectionDriver.Factory driverFactory = new TestingLeaderElectionDriver.Factory(TestingLeaderElectionDriver.newBuilder(haBackendLeadershipFlag, storedLeaderInformation, new AtomicBoolean()));
        DefaultLeaderElectionService defaultLeaderElectionService = new DefaultLeaderElectionService((LeaderElectionDriverFactory)driverFactory, (FatalErrorHandler)this.fatalErrorHandler);
        OneShotLatch closeAsyncCalledTrigger = new OneShotLatch();
        OneShotLatch triggerClassLoaderLeaseRelease = new OneShotLatch();
        TestingJobMasterServiceProcess jobMasterServiceProcess = TestingJobMasterServiceProcess.newBuilder().setGetJobMasterGatewayFutureSupplier(CompletableFuture::new).setGetResultFutureSupplier(CompletableFuture::new).setGetLeaderAddressFutureSupplier(() -> CompletableFuture.completedFuture("unused address")).setCloseAsyncSupplier(() -> {
            closeAsyncCalledTrigger.trigger();
            return CompletableFuture.completedFuture(null);
        }).build();
        String componentId = "random-component-id";
        LeaderElection leaderElection = defaultLeaderElectionService.createLeaderElection("random-component-id");
        try (JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().setClassLoaderLease(TestingClassLoaderLease.newBuilder().setCloseRunnable(() -> {
            try {
                triggerClassLoaderLeaseRelease.await();
                Thread.sleep(5L);
            }
            catch (InterruptedException e) {
                ExceptionUtils.checkInterrupted((Throwable)e);
            }
        }).build()).setJobMasterServiceProcessFactory(TestingJobMasterServiceProcessFactory.newBuilder().setJobMasterServiceProcessFunction(ignoredSessionId -> jobMasterServiceProcess).build()).setLeaderElection(leaderElection).build();){
            jobManagerRunner.start();
            haBackendLeadershipFlag.set(true);
            UUID leaderSessionID = UUID.randomUUID();
            defaultLeaderElectionService.onGrantLeadership(leaderSessionID);
            SupplierWithException confirmationForSessionIdReceived = () -> ((LeaderInformationRegister)storedLeaderInformation.get()).forComponentId("random-component-id").map(LeaderInformation::getLeaderSessionID).map(sessionId -> sessionId.equals(leaderSessionID)).orElse(false);
            CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)confirmationForSessionIdReceived);
            CheckedThread contenderCloseThread = JobMasterServiceLeadershipRunnerTest.createCheckedThread((ThrowingRunnable<? extends Exception>)((ThrowingRunnable)() -> ((JobMasterServiceLeadershipRunner)jobManagerRunner).close()));
            contenderCloseThread.start();
            closeAsyncCalledTrigger.await();
            CheckedThread grantLeadershipThread = JobMasterServiceLeadershipRunnerTest.createCheckedThread((ThrowingRunnable<? extends Exception>)((ThrowingRunnable)() -> {
                defaultLeaderElectionService.onRevokeLeadership();
                defaultLeaderElectionService.onGrantLeadership(UUID.randomUUID());
            }));
            grantLeadershipThread.start();
            triggerClassLoaderLeaseRelease.trigger();
            contenderCloseThread.sync();
            grantLeadershipThread.sync();
        }
    }

    private static CheckedThread createCheckedThread(final ThrowingRunnable<? extends Exception> callback) {
        return new CheckedThread(){

            public void go() throws Exception {
                callback.run();
            }
        };
    }

    private void assertJobNotFinished(CompletableFuture<JobManagerRunnerResult> resultFuture) throws ExecutionException, InterruptedException {
        JobManagerRunnerResult jobManagerRunnerResult = resultFuture.get();
        Assertions.assertThat((Comparable)jobManagerRunnerResult.getExecutionGraphInfo().getArchivedExecutionGraph().getState()).isEqualTo((Object)JobStatus.SUSPENDED);
    }

    public JobMasterServiceLeadershipRunnerBuilder newJobMasterServiceLeadershipRunnerBuilder() {
        return new JobMasterServiceLeadershipRunnerBuilder();
    }

    public class JobMasterServiceLeadershipRunnerBuilder {
        private JobMasterServiceProcessFactory jobMasterServiceProcessFactory = TestingJobMasterServiceProcessFactory.newBuilder().build();
        private LibraryCacheManager.ClassLoaderLease classLoaderLease = TestingClassLoaderLease.newBuilder().build();
        private LeaderElection leaderElection;

        public JobMasterServiceLeadershipRunnerBuilder() {
            this.leaderElection = JobMasterServiceLeadershipRunnerTest.this.leaderElection;
        }

        public JobMasterServiceLeadershipRunnerBuilder setClassLoaderLease(LibraryCacheManager.ClassLoaderLease classLoaderLease) {
            this.classLoaderLease = classLoaderLease;
            return this;
        }

        public JobMasterServiceLeadershipRunnerBuilder setJobMasterServiceProcessFactory(JobMasterServiceProcessFactory jobMasterServiceProcessFactory) {
            this.jobMasterServiceProcessFactory = jobMasterServiceProcessFactory;
            return this;
        }

        public JobMasterServiceLeadershipRunnerBuilder setLeaderElection(LeaderElection leaderElection) {
            this.leaderElection = leaderElection;
            return this;
        }

        public JobMasterServiceLeadershipRunner build() {
            return new JobMasterServiceLeadershipRunner(this.jobMasterServiceProcessFactory, this.leaderElection, JobMasterServiceLeadershipRunnerTest.this.jobResultStore, this.classLoaderLease, (FatalErrorHandler)JobMasterServiceLeadershipRunnerTest.this.fatalErrorHandler);
        }

        public JobMasterServiceLeadershipRunnerBuilder withSingleJobMasterServiceProcess(JobMasterServiceProcess jobMasterServiceProcess) {
            return this.withJobMasterServiceProcesses(jobMasterServiceProcess);
        }

        public JobMasterServiceLeadershipRunnerBuilder withJobMasterServiceProcesses(JobMasterServiceProcess ... jobMasterServiceProcesses) {
            ArrayDeque<JobMasterServiceProcess> jobMasterServiceProcessQueue = new ArrayDeque<JobMasterServiceProcess>(Arrays.asList(jobMasterServiceProcesses));
            this.jobMasterServiceProcessFactory = TestingJobMasterServiceProcessFactory.newBuilder().setJobMasterServiceProcessFunction(ignored -> (JobMasterServiceProcess)Preconditions.checkNotNull((Object)((JobMasterServiceProcess)jobMasterServiceProcessQueue.poll()))).build();
            return this;
        }
    }
}

