/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster;

import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneRunningJobsRegistry;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerRunnerResult;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterServiceLeadershipRunner;
import org.apache.flink.runtime.jobmaster.JobMasterServiceProcess;
import org.apache.flink.runtime.jobmaster.TestingJobMasterServiceProcess;
import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceProcessFactory;
import org.apache.flink.runtime.jobmaster.factories.TestingJobMasterServiceProcessFactory;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class JobMasterServiceLeadershipRunnerTest
extends TestLogger {
    private static final Time TESTING_TIMEOUT = Time.seconds((long)10L);
    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();
    private static JobGraph jobGraph;
    private TestingLeaderElectionService leaderElectionService;
    private TestingFatalErrorHandler fatalErrorHandler;
    private RunningJobsRegistry runningJobsRegistry;

    @BeforeClass
    public static void setupClass() {
        JobVertex jobVertex = new JobVertex("Test vertex");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex);
    }

    @Before
    public void setup() {
        this.leaderElectionService = new TestingLeaderElectionService();
        this.runningJobsRegistry = new StandaloneRunningJobsRegistry();
        this.fatalErrorHandler = new TestingFatalErrorHandler();
    }

    @After
    public void tearDown() throws Exception {
        this.fatalErrorHandler.rethrowError();
    }

    @Test
    public void testShutDownSignalsJobAsNotFinished() throws Exception {
        try (JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().build();){
            jobManagerRunner.start();
            CompletableFuture resultFuture = jobManagerRunner.getResultFuture();
            Assert.assertThat((Object)resultFuture.isDone(), (Matcher)Matchers.is((Object)false));
            jobManagerRunner.closeAsync();
            this.assertJobNotFinished(resultFuture);
            Assert.assertThat((Object)jobManagerRunner.getJobMasterGateway(), (Matcher)FlinkMatchers.futureWillCompleteExceptionally((Duration)Duration.ofMillis(5L)));
        }
    }

    @Test
    public void testCloseReleasesClassLoaderLease() throws Exception {
        OneShotLatch closeClassLoaderLeaseLatch = new OneShotLatch();
        TestingClassLoaderLease classLoaderLease = TestingClassLoaderLease.newBuilder().setCloseRunnable(() -> ((OneShotLatch)closeClassLoaderLeaseLatch).trigger()).build();
        try (JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().setClassLoaderLease(classLoaderLease).build();){
            jobManagerRunner.start();
            jobManagerRunner.close();
            closeClassLoaderLeaseLatch.await();
        }
    }

    @Test
    public void testConcurrentLeadershipOperationsBlockingClose() throws Exception {
        CompletableFuture<Void> terminationFuture = new CompletableFuture<Void>();
        JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().withJobMasterServiceProcesses(TestingJobMasterServiceProcess.newBuilder().setTerminationFuture(terminationFuture).withManualTerminationFutureCompletion().build(), TestingJobMasterServiceProcess.newBuilder().build()).build();
        jobManagerRunner.start();
        this.leaderElectionService.isLeader(UUID.randomUUID()).get();
        this.leaderElectionService.notLeader();
        CompletableFuture<UUID> leaderFuture = this.leaderElectionService.isLeader(UUID.randomUUID());
        Assert.assertThat((Object)leaderFuture.isDone(), (Matcher)Matchers.is((Object)false));
        try {
            leaderFuture.get(1L, TimeUnit.MILLISECONDS);
            Assert.fail((String)"Granted leadership even though the JobMaster has not been suspended.");
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        terminationFuture.complete(null);
        leaderFuture.get();
    }

    @Test
    public void testExceptionallyCompletedResultFutureFromJobMasterServiceProcessIsForwarded() throws Exception {
        CompletableFuture<JobManagerRunnerResult> resultFuture = new CompletableFuture<JobManagerRunnerResult>();
        TestingJobMasterServiceProcess testingJobMasterServiceProcess = TestingJobMasterServiceProcess.newBuilder().setJobManagerRunnerResultFuture(resultFuture).build();
        JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().withSingleJobMasterServiceProcess(testingJobMasterServiceProcess).build();
        jobManagerRunner.start();
        this.leaderElectionService.isLeader(UUID.randomUUID()).get();
        FlinkException cause = new FlinkException("The JobMasterService failed unexpectedly.");
        resultFuture.completeExceptionally((Throwable)cause);
        Assert.assertThat((Object)jobManagerRunner.getResultFuture(), (Matcher)FlinkMatchers.futureWillCompleteExceptionally(cause::equals, (Duration)Duration.ofMillis(5L), (String)"Wrong cause of failed result future"));
    }

    @Test
    public void testJobMasterCreationFailureCompletesJobManagerRunnerWithInitializationError() throws Exception {
        FlinkException testException = new FlinkException("Test exception");
        CompletableFuture<JobManagerRunnerResult> completedResultFuture = CompletableFuture.completedFuture(JobManagerRunnerResult.forInitializationFailure((ExecutionGraphInfo)this.createFailedExecutionGraphInfo(testException), (Throwable)testException));
        JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().withSingleJobMasterServiceProcess(TestingJobMasterServiceProcess.newBuilder().setJobManagerRunnerResultFuture(completedResultFuture).build()).build();
        jobManagerRunner.start();
        this.leaderElectionService.isLeader(UUID.randomUUID());
        JobManagerRunnerResult jobManagerRunnerResult = (JobManagerRunnerResult)jobManagerRunner.getResultFuture().join();
        Assert.assertTrue((boolean)jobManagerRunnerResult.isInitializationFailure());
        Assert.assertThat((Object)jobManagerRunnerResult.getInitializationFailure(), (Matcher)FlinkMatchers.containsCause((Throwable)testException));
    }

    @Nonnull
    private ExecutionGraphInfo createFailedExecutionGraphInfo(FlinkException testException) {
        return new ExecutionGraphInfo(ArchivedExecutionGraph.createFromInitializingJob((JobID)jobGraph.getJobID(), (String)jobGraph.getName(), (JobStatus)JobStatus.FAILED, (Throwable)testException, null, (long)1L));
    }

    @Test
    public void testJobMasterServiceProcessIsTerminatedOnClose() throws Exception {
        CompletableFuture<Void> terminationFuture = new CompletableFuture<Void>();
        JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().withSingleJobMasterServiceProcess(TestingJobMasterServiceProcess.newBuilder().setTerminationFuture(terminationFuture).build()).build();
        jobManagerRunner.start();
        this.leaderElectionService.isLeader(UUID.randomUUID());
        jobManagerRunner.closeAsync().join();
        this.assertJobNotFinished(jobManagerRunner.getResultFuture());
        Assert.assertTrue((boolean)terminationFuture.isDone());
    }

    @Test
    public void testJobMasterServiceProcessShutdownOnLeadershipLoss() throws Exception {
        CompletableFuture<Void> terminationFuture = new CompletableFuture<Void>();
        JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().withSingleJobMasterServiceProcess(TestingJobMasterServiceProcess.newBuilder().setTerminationFuture(terminationFuture).build()).build();
        jobManagerRunner.start();
        this.leaderElectionService.isLeader(UUID.randomUUID());
        this.leaderElectionService.notLeader();
        Assert.assertTrue((boolean)terminationFuture.isDone());
    }

    @Test
    public void testCancellationIsForwardedToJobMasterService() throws Exception {
        CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = new CompletableFuture<JobMasterGateway>();
        JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().withSingleJobMasterServiceProcess(TestingJobMasterServiceProcess.newBuilder().setJobMasterGatewayFuture(jobMasterGatewayFuture).build()).build();
        jobManagerRunner.start();
        this.leaderElectionService.isLeader(UUID.randomUUID());
        CompletableFuture cancellationFuture = jobManagerRunner.cancel(TESTING_TIMEOUT);
        Assert.assertThat((Object)cancellationFuture.isDone(), (Matcher)Matchers.is((Object)false));
        AtomicBoolean cancelCalled = new AtomicBoolean(false);
        TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().setCancelFunction(() -> {
            cancelCalled.set(true);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build();
        jobMasterGatewayFuture.complete(jobMasterGateway);
        cancellationFuture.get();
        Assert.assertThat((Object)cancelCalled.get(), (Matcher)Matchers.is((Object)true));
    }

    @Test
    public void testJobInformationOperationsDuringInitialization() throws Exception {
        JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().withSingleJobMasterServiceProcess(TestingJobMasterServiceProcess.newBuilder().setIsInitialized(false).build()).build();
        jobManagerRunner.start();
        JobMasterServiceLeadershipRunnerTest.assertInitializingStates((JobManagerRunner)jobManagerRunner);
        this.leaderElectionService.isLeader(UUID.randomUUID());
        JobMasterServiceLeadershipRunnerTest.assertInitializingStates((JobManagerRunner)jobManagerRunner);
    }

    private static void assertInitializingStates(JobManagerRunner jobManagerRunner) throws ExecutionException, InterruptedException {
        Assert.assertThat(jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).get(), (Matcher)Matchers.is((Object)JobStatus.INITIALIZING));
        Assert.assertThat((Object)jobManagerRunner.getResultFuture().isDone(), (Matcher)Matchers.is((Object)false));
        Assert.assertThat((Object)((ExecutionGraphInfo)jobManagerRunner.requestJob(TESTING_TIMEOUT).get()).getArchivedExecutionGraph().getState(), (Matcher)Matchers.is((Object)JobStatus.INITIALIZING));
        Assert.assertThat((Object)((JobDetails)jobManagerRunner.requestJobDetails(TESTING_TIMEOUT).get()).getStatus(), (Matcher)Matchers.is((Object)JobStatus.INITIALIZING));
    }

    @Test
    public void testSkippingOfEnqueuedLeadershipOperations() throws Exception {
        CompletableFuture<Void> firstTerminationFuture = new CompletableFuture<Void>();
        CompletableFuture<Void> secondTerminationFuture = new CompletableFuture<Void>();
        JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().withJobMasterServiceProcesses(TestingJobMasterServiceProcess.newBuilder().setTerminationFuture(firstTerminationFuture).withManualTerminationFutureCompletion().setIsInitialized(false).build(), TestingJobMasterServiceProcess.newBuilder().setTerminationFuture(secondTerminationFuture).build()).build();
        jobManagerRunner.start();
        this.leaderElectionService.isLeader(UUID.randomUUID());
        Assert.assertThat(jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).get(), (Matcher)Matchers.is((Object)JobStatus.INITIALIZING));
        for (int i = 0; i < 10; ++i) {
            this.leaderElectionService.notLeader();
            this.leaderElectionService.isLeader(UUID.randomUUID());
        }
        firstTerminationFuture.complete(null);
        jobManagerRunner.closeAsync();
        Assert.assertTrue((boolean)secondTerminationFuture.isDone());
    }

    @Test
    public void testCancellationFailsWhenInitializationFails() throws Exception {
        FlinkException testException = new FlinkException("test exception");
        this.runCancellationFailsTest(resultFuture -> resultFuture.complete(JobManagerRunnerResult.forInitializationFailure((ExecutionGraphInfo)this.createFailedExecutionGraphInfo(testException), (Throwable)testException)));
    }

    @Test
    public void testCancellationFailsWhenExceptionOccurs() throws Exception {
        FlinkException testException = new FlinkException("test exception");
        this.runCancellationFailsTest(resultFuture -> resultFuture.completeExceptionally((Throwable)testException));
    }

    public void runCancellationFailsTest(Consumer<CompletableFuture<JobManagerRunnerResult>> testAction) throws Exception {
        CompletableFuture<JobManagerRunnerResult> jobManagerRunnerResultFuture = new CompletableFuture<JobManagerRunnerResult>();
        JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().withSingleJobMasterServiceProcess(TestingJobMasterServiceProcess.newBuilder().setIsInitialized(false).setJobMasterGatewayFuture(new CompletableFuture<JobMasterGateway>()).setJobManagerRunnerResultFuture(jobManagerRunnerResultFuture).build()).build();
        jobManagerRunner.start();
        this.leaderElectionService.isLeader(UUID.randomUUID());
        Assert.assertThat(jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).get(), (Matcher)Matchers.is((Object)JobStatus.INITIALIZING));
        CompletableFuture cancelFuture = jobManagerRunner.cancel(TESTING_TIMEOUT);
        Assert.assertThat((Object)cancelFuture.isDone(), (Matcher)Matchers.is((Object)false));
        testAction.accept(jobManagerRunnerResultFuture);
        try {
            cancelFuture.get();
            Assert.fail();
        }
        catch (Throwable t) {
            Assert.assertThat((Object)t, (Matcher)FlinkMatchers.containsMessage((String)"Cancellation failed."));
        }
    }

    @Test
    public void testResultFutureCompletionOfOutdatedLeaderIsIgnored() throws Exception {
        CompletableFuture<JobManagerRunnerResult> resultFuture = new CompletableFuture<JobManagerRunnerResult>();
        JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().withSingleJobMasterServiceProcess(TestingJobMasterServiceProcess.newBuilder().setJobManagerRunnerResultFuture(resultFuture).build()).build();
        jobManagerRunner.start();
        this.leaderElectionService.isLeader(UUID.randomUUID()).join();
        this.leaderElectionService.notLeader();
        resultFuture.complete(JobManagerRunnerResult.forSuccess((ExecutionGraphInfo)this.createFailedExecutionGraphInfo(new FlinkException("test exception"))));
        Assert.assertThat((Object)jobManagerRunner.getResultFuture(), (Matcher)FlinkMatchers.willNotComplete((Duration)Duration.ofMillis(5L)));
    }

    @Test
    public void testJobMasterGatewayIsInvalidatedOnLeadershipChanges() throws Exception {
        JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().withSingleJobMasterServiceProcess(TestingJobMasterServiceProcess.newBuilder().setJobMasterGatewayFuture(new CompletableFuture<JobMasterGateway>()).build()).build();
        jobManagerRunner.start();
        CompletableFuture jobMasterGateway = jobManagerRunner.getJobMasterGateway();
        this.leaderElectionService.isLeader(UUID.randomUUID());
        this.leaderElectionService.notLeader();
        Assert.assertThat((Object)jobMasterGateway, (Matcher)FlinkMatchers.futureWillCompleteExceptionally((Duration)Duration.ofMillis(5L)));
    }

    @Test
    public void testLeaderAddressOfOutdatedLeaderIsIgnored() throws Exception {
        CompletableFuture<String> leaderAddressFuture = new CompletableFuture<String>();
        JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().withSingleJobMasterServiceProcess(TestingJobMasterServiceProcess.newBuilder().setLeaderAddressFuture(leaderAddressFuture).build()).build();
        jobManagerRunner.start();
        CompletableFuture<UUID> leaderFuture = this.leaderElectionService.isLeader(UUID.randomUUID());
        this.leaderElectionService.notLeader();
        leaderAddressFuture.complete("foobar");
        Assert.assertThat(leaderFuture, (Matcher)FlinkMatchers.willNotComplete((Duration)Duration.ofMillis(5L)));
    }

    @Test
    public void testInitialJobStatusIsInitializing() throws Exception {
        JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().build();
        jobManagerRunner.start();
        Assert.assertThat(jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).join(), (Matcher)Matchers.is((Object)JobStatus.INITIALIZING));
    }

    @Test
    public void testCancellationChangesJobStatusToCancelling() throws Exception {
        JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().build();
        jobManagerRunner.start();
        jobManagerRunner.cancel(TESTING_TIMEOUT);
        Assert.assertThat(jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).join(), (Matcher)Matchers.is((Object)JobStatus.CANCELLING));
    }

    @Test
    public void testJobStatusCancellingIsClearedOnLeadershipLoss() throws Exception {
        JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().build();
        jobManagerRunner.start();
        jobManagerRunner.cancel(TESTING_TIMEOUT);
        this.leaderElectionService.isLeader(UUID.randomUUID());
        this.leaderElectionService.notLeader();
        Assert.assertThat(jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).join(), (Matcher)Matchers.is((Object)JobStatus.INITIALIZING));
    }

    @Test
    public void testJobMasterServiceProcessClosingExceptionIsForwardedToResultFuture() throws Exception {
        CompletableFuture<Void> terminationFuture = new CompletableFuture<Void>();
        JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().withSingleJobMasterServiceProcess(TestingJobMasterServiceProcess.newBuilder().setTerminationFuture(terminationFuture).withManualTerminationFutureCompletion().build()).build();
        jobManagerRunner.start();
        this.leaderElectionService.isLeader(UUID.randomUUID());
        this.leaderElectionService.notLeader();
        FlinkException testException = new FlinkException("Test exception");
        terminationFuture.completeExceptionally((Throwable)testException);
        Assert.assertThat((Object)jobManagerRunner.getResultFuture(), (Matcher)FlinkMatchers.futureWillCompleteExceptionally(cause -> ExceptionUtils.findThrowable((Throwable)cause, testException::equals).isPresent(), (Duration)Duration.ofMillis(5L), (String)"Result future should be completed exceptionally."));
    }

    @Test
    public void testJobMasterServiceProcessCreationFailureIsForwardedToResultFuture() throws Exception {
        FlinkRuntimeException testException = new FlinkRuntimeException("Test exception");
        JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().setJobMasterServiceProcessFactory(TestingJobMasterServiceProcessFactory.newBuilder().setJobMasterServiceProcessFunction(ignored -> {
            throw testException;
        }).build()).build();
        jobManagerRunner.start();
        this.leaderElectionService.isLeader(UUID.randomUUID());
        Assert.assertThat((Object)jobManagerRunner.getResultFuture(), (Matcher)FlinkMatchers.futureWillCompleteExceptionally(cause -> ExceptionUtils.findThrowable((Throwable)cause, arg_0 -> testException.equals(arg_0)).isPresent(), (Duration)Duration.ofMillis(5L), (String)"Result future should be completed exceptionally."));
    }

    @Test
    public void testJobAlreadyDone() throws Exception {
        JobID jobID = new JobID();
        try (JobMasterServiceLeadershipRunner jobManagerRunner = this.newJobMasterServiceLeadershipRunnerBuilder().setJobMasterServiceProcessFactory(TestingJobMasterServiceProcessFactory.newBuilder().setJobId(jobID).build()).build();){
            this.runningJobsRegistry.setJobFinished(jobID);
            jobManagerRunner.start();
            this.leaderElectionService.isLeader(UUID.randomUUID());
            CompletableFuture resultFuture = jobManagerRunner.getResultFuture();
            JobManagerRunnerResult result = (JobManagerRunnerResult)resultFuture.get();
            Assert.assertEquals((Object)JobStatus.FAILED, (Object)result.getExecutionGraphInfo().getArchivedExecutionGraph().getState());
        }
    }

    private void assertJobNotFinished(CompletableFuture<JobManagerRunnerResult> resultFuture) throws ExecutionException, InterruptedException {
        JobManagerRunnerResult jobManagerRunnerResult = resultFuture.get();
        Assert.assertEquals((Object)jobManagerRunnerResult.getExecutionGraphInfo().getArchivedExecutionGraph().getState(), (Object)JobStatus.SUSPENDED);
    }

    public JobMasterServiceLeadershipRunnerBuilder newJobMasterServiceLeadershipRunnerBuilder() {
        return new JobMasterServiceLeadershipRunnerBuilder();
    }

    public class JobMasterServiceLeadershipRunnerBuilder {
        private JobMasterServiceProcessFactory jobMasterServiceProcessFactory = TestingJobMasterServiceProcessFactory.newBuilder().build();
        private LibraryCacheManager.ClassLoaderLease classLoaderLease = TestingClassLoaderLease.newBuilder().build();

        public JobMasterServiceLeadershipRunnerBuilder setClassLoaderLease(LibraryCacheManager.ClassLoaderLease classLoaderLease) {
            this.classLoaderLease = classLoaderLease;
            return this;
        }

        public JobMasterServiceLeadershipRunnerBuilder setJobMasterServiceProcessFactory(JobMasterServiceProcessFactory jobMasterServiceProcessFactory) {
            this.jobMasterServiceProcessFactory = jobMasterServiceProcessFactory;
            return this;
        }

        public JobMasterServiceLeadershipRunner build() {
            return new JobMasterServiceLeadershipRunner(this.jobMasterServiceProcessFactory, (LeaderElectionService)JobMasterServiceLeadershipRunnerTest.this.leaderElectionService, JobMasterServiceLeadershipRunnerTest.this.runningJobsRegistry, this.classLoaderLease, (FatalErrorHandler)JobMasterServiceLeadershipRunnerTest.this.fatalErrorHandler);
        }

        public JobMasterServiceLeadershipRunnerBuilder withSingleJobMasterServiceProcess(JobMasterServiceProcess jobMasterServiceProcess) {
            return this.withJobMasterServiceProcesses(jobMasterServiceProcess);
        }

        public JobMasterServiceLeadershipRunnerBuilder withJobMasterServiceProcesses(JobMasterServiceProcess ... jobMasterServiceProcesses) {
            ArrayDeque<JobMasterServiceProcess> jobMasterServiceProcessQueue = new ArrayDeque<JobMasterServiceProcess>(Arrays.asList(jobMasterServiceProcesses));
            this.jobMasterServiceProcessFactory = TestingJobMasterServiceProcessFactory.newBuilder().setJobMasterServiceProcessFunction(ignored -> (JobMasterServiceProcess)Preconditions.checkNotNull(jobMasterServiceProcessQueue.poll())).build();
            return this;
        }
    }
}

