/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher;

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.DefaultCheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.dispatcher.AbstractDispatcherTest;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.JobManagerRunnerFactory;
import org.apache.flink.runtime.dispatcher.JobMasterServiceLeadershipRunnerFactory;
import org.apache.flink.runtime.dispatcher.NoOpDispatcherBootstrap;
import org.apache.flink.runtime.dispatcher.TestingDispatcher;
import org.apache.flink.runtime.dispatcher.TestingJobMasterServiceLeadershipRunnerFactory;
import org.apache.flink.runtime.dispatcher.TriggerSavepointMode;
import org.apache.flink.runtime.dispatcher.UnavailableDispatcherOperationException;
import org.apache.flink.runtime.dispatcher.cleanup.TestingCleanupRunnerFactory;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.JobResultEntry;
import org.apache.flink.runtime.highavailability.JobResultStore;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.ExecutionPlanWriter;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerRunnerResult;
import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterService;
import org.apache.flink.runtime.jobmaster.JobMasterServiceLeadershipRunner;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
import org.apache.flink.runtime.jobmaster.TestingJobMasterService;
import org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceProcessFactory;
import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory;
import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceProcessFactory;
import org.apache.flink.runtime.jobmaster.factories.TestingJobMasterServiceFactory;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderelection.LeaderElection;
import org.apache.flink.runtime.leaderelection.TestingLeaderElection;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.FlinkJobTerminatedWithoutCancellationException;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.TestingExecutionPlanStore;
import org.apache.flink.runtime.testutils.TestingJobResultStore;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.shaded.guava32.com.google.common.collect.ImmutableMap;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.streaming.api.graph.ExecutionPlan;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.SupplierWithException;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.AbstractComparableAssert;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.CompletableFutureAssert;
import org.assertj.core.api.ObjectAssert;
import org.assertj.core.api.ThrowableAssertAlternative;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;

public class DispatcherTest
extends AbstractDispatcherTest {
    private JobGraph jobGraph;
    private JobID jobId;
    private TestingLeaderElection jobMasterLeaderElection;
    private TestingDispatcher dispatcher;

    @Override
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
        this.jobId = this.jobGraph.getJobID();
        this.jobMasterLeaderElection = new TestingLeaderElection();
        this.haServices.setJobMasterLeaderElection(this.jobId, this.jobMasterLeaderElection);
    }

    @Nonnull
    private TestingDispatcher createAndStartDispatcher(HeartbeatServices heartbeatServices, TestingHighAvailabilityServices haServices, JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
        TestingDispatcher dispatcher = this.createTestingDispatcherBuilder().setHighAvailabilityServices(haServices).setHeartbeatServices(heartbeatServices).setJobManagerRunnerFactory(jobManagerRunnerFactory).setExecutionPlanWriter((ExecutionPlanWriter)haServices.getExecutionPlanStore()).setJobResultStore(haServices.getJobResultStore()).build(rpcService);
        dispatcher.start();
        return dispatcher;
    }

    @Override
    @After
    public void tearDown() throws Exception {
        if (this.dispatcher != null) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{this.dispatcher});
        }
        super.tearDown();
    }

    @Test
    public void testJobSubmission() throws Exception {
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(this.jobId));
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        dispatcherGateway.submitJob((ExecutionPlan)this.jobGraph, TIMEOUT).get();
        this.jobMasterLeaderElection.getStartFuture().get();
        ((CompletableFutureAssert)Assertions.assertThat(this.jobMasterLeaderElection.getStartFuture()).as("jobManagerRunner was not started", new Object[0])).isDone();
    }

    @Test
    public void testDuplicateJobSubmissionWithGloballyTerminatedButDirtyJob() throws Exception {
        JobResult jobResult = TestingJobResultStore.createJobResult(this.jobGraph.getJobID(), ApplicationStatus.SUCCEEDED);
        this.haServices.getJobResultStore().createDirtyResultAsync(new JobResultEntry(jobResult)).get();
        this.assertDuplicateJobSubmission();
    }

    @Test
    public void testDuplicateJobSubmissionWithGloballyTerminatedAndCleanedJob() throws Exception {
        JobResult jobResult = TestingJobResultStore.createJobResult(this.jobGraph.getJobID(), ApplicationStatus.SUCCEEDED);
        this.haServices.getJobResultStore().createDirtyResultAsync(new JobResultEntry(jobResult)).get();
        this.haServices.getJobResultStore().markResultAsCleanAsync(this.jobGraph.getJobID()).get();
        this.assertDuplicateJobSubmission();
    }

    @Test
    public void testDuplicateJobSubmissionIsDetectedOnSimultaneousSubmission() throws Exception {
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new TestingJobMasterServiceLeadershipRunnerFactory());
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        int numThreads = 5;
        CountDownLatch prepareLatch = new CountDownLatch(5);
        OneShotLatch startLatch = new OneShotLatch();
        List exceptions = Collections.synchronizedList(new ArrayList());
        ArrayList<Thread> threads = new ArrayList<Thread>();
        for (int x = 0; x < 5; ++x) {
            threads.add(new Thread(() -> {
                try {
                    prepareLatch.countDown();
                    startLatch.awaitQuietly();
                    dispatcherGateway.submitJob((ExecutionPlan)this.jobGraph, TIMEOUT).join();
                }
                catch (Throwable t) {
                    exceptions.add(t);
                }
            }));
        }
        threads.forEach(Thread::start);
        prepareLatch.await();
        startLatch.trigger();
        for (Thread thread : threads) {
            thread.join();
        }
        FlinkAssertions.assertThatFuture((CompletableFuture)dispatcherGateway.requestJobStatus(this.jobGraph.getJobID(), TIMEOUT)).eventuallySucceeds();
        ((AbstractCollectionAssert)Assertions.assertThat(exceptions).hasSize(4)).allSatisfy(t -> Assertions.assertThat((Throwable)t).hasCauseInstanceOf(DuplicateJobSubmissionException.class));
    }

    private void assertDuplicateJobSubmission() throws Exception {
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(this.jobId));
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        CompletableFuture submitFuture = dispatcherGateway.submitJob((ExecutionPlan)this.jobGraph, TIMEOUT);
        Assertions.assertThatThrownBy(submitFuture::get).hasCauseInstanceOf(DuplicateJobSubmissionException.class).satisfies(new ThrowingConsumer[]{e -> Assertions.assertThat((boolean)((DuplicateJobSubmissionException)e.getCause()).isGloballyTerminated()).isTrue()});
    }

    @Test
    public void testDuplicateJobSubmissionWithRunningJobId() throws Exception {
        this.dispatcher = this.createTestingDispatcherBuilder().setJobManagerRunnerFactory(new ExpectedJobIdJobManagerRunnerFactory(this.jobId)).setRecoveredJobs(Collections.singleton(this.jobGraph)).build(rpcService);
        this.dispatcher.start();
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        CompletableFuture submitFuture = dispatcherGateway.submitJob((ExecutionPlan)this.jobGraph, TIMEOUT);
        Assertions.assertThatThrownBy(submitFuture::get).hasCauseInstanceOf(DuplicateJobSubmissionException.class).satisfies(new ThrowingConsumer[]{e -> Assertions.assertThat((boolean)((DuplicateJobSubmissionException)e.getCause()).isGloballyTerminated()).isFalse()});
    }

    @Test
    public void testJobSubmissionWithPartialResourceConfigured() throws Exception {
        ResourceSpec resourceSpec = ResourceSpec.newBuilder((double)2.0, (int)10).build();
        JobVertex firstVertex = new JobVertex("firstVertex");
        firstVertex.setInvokableClass(NoOpInvokable.class);
        firstVertex.setResources(resourceSpec, resourceSpec);
        JobVertex secondVertex = new JobVertex("secondVertex");
        secondVertex.setInvokableClass(NoOpInvokable.class);
        JobGraph jobGraphWithTwoVertices = JobGraphTestUtils.streamingJobGraph(firstVertex, secondVertex);
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(this.jobId));
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        CompletableFuture acknowledgeFuture = dispatcherGateway.submitJob((ExecutionPlan)jobGraphWithTwoVertices, TIMEOUT);
        Assertions.assertThatThrownBy(() -> acknowledgeFuture.get()).hasCauseInstanceOf(JobSubmissionException.class);
    }

    @Test
    public void testNonBlockingJobSubmission() throws Exception {
        JobManagerRunnerWithBlockingJobMasterFactory blockingJobMaster = new JobManagerRunnerWithBlockingJobMasterFactory();
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, blockingJobMaster);
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        this.jobMasterLeaderElection.isLeader(UUID.randomUUID());
        dispatcherGateway.submitJob((ExecutionPlan)this.jobGraph, TIMEOUT).get();
        blockingJobMaster.waitForBlockingInit();
        Assertions.assertThat((Comparable)((JobStatus)dispatcherGateway.requestJobStatus(this.jobId, TIMEOUT).get())).isSameAs((Object)JobStatus.INITIALIZING);
        MultipleJobsDetails multiDetails = (MultipleJobsDetails)dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get();
        Assertions.assertThat((Collection)multiDetails.getJobs()).hasSize(1);
        Assertions.assertThat((Comparable)((JobDetails)multiDetails.getJobs().iterator().next()).getJobId()).isEqualTo((Object)this.jobId);
        blockingJobMaster.unblockJobMasterInitialization();
        DispatcherTest.awaitStatus(dispatcherGateway, this.jobId, JobStatus.RUNNING);
    }

    @Test
    public void testInvalidCallDuringInitialization() throws Exception {
        JobManagerRunnerWithBlockingJobMasterFactory blockingJobMaster = new JobManagerRunnerWithBlockingJobMasterFactory();
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, blockingJobMaster);
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        dispatcherGateway.submitJob((ExecutionPlan)this.jobGraph, TIMEOUT).get();
        Assertions.assertThat((Comparable)((JobStatus)dispatcherGateway.requestJobStatus(this.jobId, TIMEOUT).get())).isSameAs((Object)JobStatus.INITIALIZING);
        Assertions.assertThatThrownBy(() -> dispatcherGateway.triggerSavepointAndGetLocation(this.jobId, "file:///tmp/savepoint", SavepointFormatType.CANONICAL, TriggerSavepointMode.SAVEPOINT, TIMEOUT).get()).hasCauseInstanceOf(UnavailableDispatcherOperationException.class);
    }

    @Test
    public void testCancellationDuringInitialization() throws Exception {
        CancellableJobManagerRunnerWithInitializedJobFactory runnerFactory = new CancellableJobManagerRunnerWithInitializedJobFactory(this.jobId);
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, runnerFactory);
        this.jobMasterLeaderElection.isLeader(UUID.randomUUID());
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        FlinkAssertions.assertThatFuture((CompletableFuture)dispatcherGateway.submitJob((ExecutionPlan)this.jobGraph, TIMEOUT)).eventuallySucceeds();
        FlinkAssertions.assertThatFuture((CompletableFuture)dispatcherGateway.requestJobStatus(this.jobGraph.getJobID(), TIMEOUT)).eventuallySucceeds().isEqualTo((Object)JobStatus.INITIALIZING);
        CompletableFuture cancellationRequestFuture = dispatcherGateway.cancelJob(this.jobGraph.getJobID(), TIMEOUT);
        FlinkAssertions.assertThatFuture((CompletableFuture)dispatcherGateway.requestJobStatus(this.jobGraph.getJobID(), TIMEOUT)).eventuallySucceeds().isEqualTo((Object)JobStatus.CANCELLING);
        FlinkAssertions.assertThatFuture((CompletableFuture)cancellationRequestFuture).isNotDone();
        runnerFactory.unblockCancellation();
        FlinkAssertions.assertThatFuture((CompletableFuture)cancellationRequestFuture).eventuallySucceeds();
        FlinkAssertions.assertThatFuture((CompletableFuture)dispatcherGateway.requestJobResult(this.jobGraph.getJobID(), TIMEOUT)).eventuallySucceeds().extracting(JobResult::getApplicationStatus).isEqualTo((Object)ApplicationStatus.CANCELED);
    }

    @Test
    public void testCancellationOfCanceledTerminalDoesNotThrowException() throws Exception {
        CompletableFuture<JobManagerRunnerResult> jobTerminationFuture = new CompletableFuture<JobManagerRunnerResult>();
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new FinishingJobManagerRunnerFactory(jobTerminationFuture, () -> {}));
        this.jobMasterLeaderElection.isLeader(UUID.randomUUID());
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        JobID jobId = this.jobGraph.getJobID();
        dispatcherGateway.submitJob((ExecutionPlan)this.jobGraph, TIMEOUT).get();
        jobTerminationFuture.complete(JobManagerRunnerResult.forSuccess((ExecutionGraphInfo)new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setJobID(jobId).setState(JobStatus.CANCELED).build())));
        this.dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();
        Assertions.assertThat((Comparable)((JobStatus)dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get())).isSameAs((Object)JobStatus.CANCELED);
        dispatcherGateway.cancelJob(jobId, TIMEOUT).get();
    }

    @Test
    public void testCancellationOfNonCanceledTerminalJobFailsWithAppropriateException() throws Exception {
        CompletableFuture<JobManagerRunnerResult> jobTerminationFuture = new CompletableFuture<JobManagerRunnerResult>();
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new FinishingJobManagerRunnerFactory(jobTerminationFuture, () -> {}));
        this.jobMasterLeaderElection.isLeader(UUID.randomUUID());
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        JobID jobId = this.jobGraph.getJobID();
        dispatcherGateway.submitJob((ExecutionPlan)this.jobGraph, TIMEOUT).get();
        jobTerminationFuture.complete(JobManagerRunnerResult.forSuccess((ExecutionGraphInfo)new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setJobID(jobId).setState(JobStatus.FINISHED).build())));
        this.dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();
        Assertions.assertThat((Comparable)((JobStatus)dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get())).isSameAs((Object)JobStatus.FINISHED);
        CompletableFuture cancelFuture = dispatcherGateway.cancelJob(jobId, TIMEOUT);
        FlinkAssertions.assertThatFuture((CompletableFuture)cancelFuture).eventuallyFails().withCauseOfType(FlinkJobTerminatedWithoutCancellationException.class);
    }

    @Test
    public void testNoHistoryServerArchiveCreatedForSuspendedJob() throws Exception {
        CompletableFuture archiveAttemptFuture = new CompletableFuture();
        CompletableFuture<JobManagerRunnerResult> jobTerminationFuture = new CompletableFuture<JobManagerRunnerResult>();
        this.dispatcher = this.createTestingDispatcherBuilder().setJobManagerRunnerFactory(new FinishingJobManagerRunnerFactory(jobTerminationFuture, () -> {})).setHistoryServerArchivist(executionGraphInfo -> {
            archiveAttemptFuture.complete(null);
            return CompletableFuture.completedFuture(null);
        }).build(rpcService);
        this.dispatcher.start();
        this.jobMasterLeaderElection.isLeader(UUID.randomUUID());
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        JobID jobId = this.jobGraph.getJobID();
        dispatcherGateway.submitJob((ExecutionPlan)this.jobGraph, TIMEOUT).get();
        jobTerminationFuture.complete(JobManagerRunnerResult.forSuccess((ExecutionGraphInfo)new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setJobID(jobId).setState(JobStatus.SUSPENDED).build())));
        dispatcherGateway.requestJobResult(jobId, TIMEOUT).get();
        Assertions.assertThat((Comparable)((JobStatus)dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get())).isSameAs((Object)JobStatus.SUSPENDED);
        Assertions.assertThat(archiveAttemptFuture).isNotDone();
    }

    @Test
    public void testJobManagerRunnerInitializationFailureFailsJob() throws Exception {
        TestingJobMasterServiceLeadershipRunnerFactory testingJobManagerRunnerFactory = new TestingJobMasterServiceLeadershipRunnerFactory();
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, testingJobManagerRunnerFactory);
        this.jobMasterLeaderElection.isLeader(UUID.randomUUID());
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        JobGraph emptyJobGraph = JobGraphBuilder.newStreamingJobGraphBuilder().setJobId(this.jobId).build();
        dispatcherGateway.submitJob((ExecutionPlan)emptyJobGraph, TIMEOUT).get();
        TestingJobManagerRunner testingJobManagerRunner = testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
        FlinkException testFailure = new FlinkException("Test failure");
        testingJobManagerRunner.completeResultFuture(JobManagerRunnerResult.forInitializationFailure((ExecutionGraphInfo)new ExecutionGraphInfo(ArchivedExecutionGraph.createSparseArchivedExecutionGraph((JobID)this.jobId, (String)this.jobGraph.getName(), (JobStatus)JobStatus.FAILED, null, (Throwable)testFailure, (JobCheckpointingSettings)this.jobGraph.getCheckpointingSettings(), (long)1L)), (Throwable)testFailure));
        this.dispatcher.getJobTerminationFuture(this.jobId, TIMEOUT).get();
        ArchivedExecutionGraph execGraph = (ArchivedExecutionGraph)dispatcherGateway.requestJob(this.jobGraph.getJobID(), TIMEOUT).get();
        Assertions.assertThat((Comparable)execGraph.getState()).isSameAs((Object)JobStatus.FAILED);
        Assert.assertNotNull((Object)execGraph.getFailureInfo());
        Throwable throwable = execGraph.getFailureInfo().getException().deserializeError(ClassLoader.getSystemClassLoader());
        Assertions.assertThat((Throwable)throwable).hasMessage(testFailure.getMessage());
    }

    @Test
    public void testCacheJobExecutionResult() throws Exception {
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(this.jobId));
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        JobID failedJobId = new JobID();
        JobStatus expectedState = JobStatus.FAILED;
        ExecutionGraphInfo failedExecutionGraphInfo = new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setJobID(failedJobId).setState(expectedState).setFailureCause(new ErrorInfo((Throwable)new RuntimeException("expected"), 1L)).build());
        this.dispatcher.completeJobExecution(failedExecutionGraphInfo);
        Assertions.assertThat((Comparable)((JobStatus)dispatcherGateway.requestJobStatus(failedJobId, TIMEOUT).get())).isEqualTo((Object)expectedState);
        CompletableFuture completableFutureCompletableFuture = this.dispatcher.callAsyncInMainThread(() -> this.dispatcher.requestExecutionGraphInfo(failedJobId, TIMEOUT));
        Assertions.assertThat((Object)((ExecutionGraphInfo)completableFutureCompletableFuture.get())).isEqualTo((Object)failedExecutionGraphInfo);
    }

    @Test
    public void testRetrieveCheckpointStats() throws Exception {
        CheckpointStatsSnapshot snapshot = CheckpointStatsSnapshot.empty();
        TestingJobMasterGateway testingJobMasterGateway = new TestingJobMasterGatewayBuilder().setCheckpointStatsSnapshotSupplier(() -> CompletableFuture.completedFuture(snapshot)).build();
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new TestingJobMasterGatewayJobManagerRunnerFactory(testingJobMasterGateway));
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        dispatcherGateway.submitJob((ExecutionPlan)this.jobGraph, TIMEOUT).get();
        CompletableFuture resultsFuture = this.dispatcher.callAsyncInMainThread(() -> this.dispatcher.requestCheckpointStats(this.jobId, TIMEOUT));
        Assertions.assertThat(resultsFuture).succeedsWithin(Duration.ofSeconds(1L));
        Assertions.assertThat(resultsFuture).isCompletedWithValue((Object)snapshot);
    }

    @Test
    public void testRetrieveCheckpointStatsOnFailedJob() throws Exception {
        this.testRetrieveCheckpointStatsWithJobStatus(JobStatus.FAILED);
    }

    @Test
    public void testRetrieveCheckpointStatsOnFinishedJob() throws Exception {
        this.testRetrieveCheckpointStatsWithJobStatus(JobStatus.FINISHED);
    }

    @Test
    public void testRetrieveCheckpointStatsOnCancelledJob() throws Exception {
        this.testRetrieveCheckpointStatsWithJobStatus(JobStatus.CANCELED);
    }

    private void testRetrieveCheckpointStatsWithJobStatus(JobStatus jobStatus) throws Exception {
        CheckpointStatsSnapshot snapshot = this.getTestCheckpointStatsSnapshotWithTwoFailedCheckpoints();
        TestingJobMasterGateway testingJobMasterGateway = new TestingJobMasterGatewayBuilder().build();
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new TestingJobMasterGatewayJobManagerRunnerFactory(testingJobMasterGateway));
        ErrorInfo failureCause = jobStatus == JobStatus.FAILED ? new ErrorInfo((Throwable)new RuntimeException("expected"), 1L) : null;
        ExecutionGraphInfo completedExecutionGraphInfo = new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setJobID(this.jobId).setState(jobStatus).setCheckpointStatsSnapshot(snapshot).setFailureCause(failureCause).build());
        this.dispatcher.completeJobExecution(completedExecutionGraphInfo);
        CompletableFuture resultsFuture = this.dispatcher.callAsyncInMainThread(() -> this.dispatcher.requestCheckpointStats(this.jobId, TIMEOUT));
        Assertions.assertThat(resultsFuture).succeedsWithin(Duration.ofSeconds(1L));
        Assertions.assertThat(resultsFuture).isCompletedWithValue((Object)snapshot);
    }

    private CheckpointStatsSnapshot getTestCheckpointStatsSnapshotWithTwoFailedCheckpoints() {
        DefaultCheckpointStatsTracker checkpointStatsTracker = new DefaultCheckpointStatsTracker(10, UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
        checkpointStatsTracker.reportFailedCheckpointsWithoutInProgress();
        checkpointStatsTracker.reportFailedCheckpointsWithoutInProgress();
        return checkpointStatsTracker.createSnapshot();
    }

    @Test
    public void testRetrieveCheckpointStatsOnNonExistentJob() throws Exception {
        TestingJobMasterGateway testingJobMasterGateway = new TestingJobMasterGatewayBuilder().build();
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new TestingJobMasterGatewayJobManagerRunnerFactory(testingJobMasterGateway));
        CompletableFuture resultsFuture = this.dispatcher.callAsyncInMainThread(() -> this.dispatcher.requestCheckpointStats(this.jobId, TIMEOUT));
        Assertions.assertThat(resultsFuture).failsWithin(Duration.ofSeconds(1L));
        Assertions.assertThat(resultsFuture).isCompletedExceptionally();
        Assertions.assertThatThrownBy(resultsFuture::get).hasCauseInstanceOf(FlinkJobNotFoundException.class).hasMessageContaining("Could not find Flink job");
    }

    @Test
    public void testThrowExceptionIfJobExecutionResultNotFound() throws Exception {
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(this.jobId));
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        Assertions.assertThatThrownBy(() -> dispatcherGateway.requestJob(new JobID(), TIMEOUT).get()).hasCauseInstanceOf(FlinkJobNotFoundException.class);
    }

    @Test
    public void testSavepointDisposal() throws Exception {
        URI externalPointer = this.createTestingSavepoint();
        Path savepointPath = Paths.get(externalPointer);
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(this.jobId));
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        Assertions.assertThat((boolean)Files.exists(savepointPath, new LinkOption[0])).isTrue();
        dispatcherGateway.disposeSavepoint(externalPointer.toString(), TIMEOUT).get();
        Assertions.assertThat((boolean)Files.exists(savepointPath, new LinkOption[0])).isFalse();
    }

    @Nonnull
    private URI createTestingSavepoint() throws IOException, URISyntaxException {
        CheckpointStorage storage = Checkpoints.loadCheckpointStorage((Configuration)new Configuration(), (Configuration)this.configuration, (ClassLoader)Thread.currentThread().getContextClassLoader(), (Logger)this.log);
        CheckpointStorageAccess checkpointStorage = storage.createCheckpointStorage(this.jobGraph.getJobID());
        File savepointFile = this.temporaryFolder.newFolder();
        long checkpointId = 1L;
        CheckpointStorageLocation checkpointStorageLocation = checkpointStorage.initializeLocationForSavepoint(1L, savepointFile.getAbsolutePath());
        CheckpointMetadataOutputStream metadataOutputStream = checkpointStorageLocation.createMetadataOutputStream();
        Checkpoints.storeCheckpointMetadata((CheckpointMetadata)new CheckpointMetadata(1L, Collections.emptyList(), Collections.emptyList()), (OutputStream)metadataOutputStream);
        CompletedCheckpointStorageLocation completedCheckpointStorageLocation = metadataOutputStream.closeAndFinalizeCheckpoint();
        return new URI(completedCheckpointStorageLocation.getExternalPointer());
    }

    @Test
    public void testFatalErrorIfRecoveredJobsCannotBeStarted() throws Exception {
        this.testJobManagerRunnerFailureResultingInFatalError((testingJobManagerRunner, actualError) -> testingJobManagerRunner.completeResultFuture(JobManagerRunnerResult.forInitializationFailure((ExecutionGraphInfo)new ExecutionGraphInfo(ArchivedExecutionGraph.createSparseArchivedExecutionGraph((JobID)this.jobId, (String)this.jobGraph.getName(), (JobStatus)JobStatus.FAILED, null, (Throwable)actualError, (JobCheckpointingSettings)this.jobGraph.getCheckpointingSettings(), (long)1L)), (Throwable)actualError)));
    }

    @Test
    public void testFatalErrorIfSomeOtherErrorCausedTheJobMasterToFail() throws Exception {
        this.testJobManagerRunnerFailureResultingInFatalError(TestingJobManagerRunner::completeResultFutureExceptionally);
    }

    private void testJobManagerRunnerFailureResultingInFatalError(BiConsumer<TestingJobManagerRunner, Exception> jobManagerRunnerWithErrorConsumer) throws Exception {
        FlinkException testException = new FlinkException("Expected test exception");
        this.jobMasterLeaderElection.isLeader(UUID.randomUUID());
        TestingJobMasterServiceLeadershipRunnerFactory jobManagerRunnerFactory = new TestingJobMasterServiceLeadershipRunnerFactory();
        this.dispatcher = this.createTestingDispatcherBuilder().setJobManagerRunnerFactory(jobManagerRunnerFactory).setRecoveredJobs(Collections.singleton(JobGraphTestUtils.emptyJobGraph())).build(rpcService);
        this.dispatcher.start();
        TestingFatalErrorHandler fatalErrorHandler = this.testingFatalErrorHandlerResource.getFatalErrorHandler();
        jobManagerRunnerWithErrorConsumer.accept(jobManagerRunnerFactory.takeCreatedJobManagerRunner(), (Exception)testException);
        Throwable error = fatalErrorHandler.getErrorFuture().get(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
        Assertions.assertThat((Optional)ExceptionUtils.findThrowableWithMessage((Throwable)error, (String)testException.getMessage())).isPresent();
        fatalErrorHandler.clearError();
    }

    @Test(expected=IllegalArgumentException.class)
    public void testThatDirtilyFinishedJobsNotBeingRetriggered() throws Exception {
        JobGraph jobGraph = JobGraphTestUtils.emptyJobGraph();
        JobResult jobResult = TestingJobResultStore.createSuccessfulJobResult(jobGraph.getJobID());
        this.dispatcher = this.createTestingDispatcherBuilder().setRecoveredJobs(Collections.singleton(jobGraph)).setRecoveredDirtyJobs(Collections.singleton(jobResult)).build(rpcService);
    }

    @Test
    public void testJobCleanupWithoutRecoveredJobGraph() throws Exception {
        JobID jobIdOfRecoveredDirtyJobs = new JobID();
        TestingJobMasterServiceLeadershipRunnerFactory jobManagerRunnerFactory = new TestingJobMasterServiceLeadershipRunnerFactory();
        TestingCleanupRunnerFactory cleanupRunnerFactory = new TestingCleanupRunnerFactory();
        OneShotLatch dispatcherBootstrapLatch = new OneShotLatch();
        this.dispatcher = this.createTestingDispatcherBuilder().setJobManagerRunnerFactory(jobManagerRunnerFactory).setCleanupRunnerFactory(cleanupRunnerFactory).setRecoveredDirtyJobs(Collections.singleton(new JobResult.Builder().jobId(jobIdOfRecoveredDirtyJobs).applicationStatus(ApplicationStatus.SUCCEEDED).netRuntime(1L).build())).setDispatcherBootstrapFactory((ignoredDispatcherGateway, ignoredScheduledExecutor, ignoredFatalErrorHandler) -> {
            dispatcherBootstrapLatch.trigger();
            return new NoOpDispatcherBootstrap();
        }).build(rpcService);
        this.dispatcher.start();
        dispatcherBootstrapLatch.await();
        TestingJobManagerRunner cleanupRunner = cleanupRunnerFactory.takeCreatedJobManagerRunner();
        ((AbstractComparableAssert)Assertions.assertThat((Comparable)cleanupRunner.getJobID()).as("The CleanupJobManagerRunner has the wrong job ID attached.", new Object[0])).isEqualTo((Object)jobIdOfRecoveredDirtyJobs);
        ((AbstractIntegerAssert)Assertions.assertThat((int)jobManagerRunnerFactory.getQueueSize()).as("No JobMaster should have been started.", new Object[0])).isZero();
    }

    @Test
    public void testPersistedJobGraphWhenDispatcherIsShutDown() throws Exception {
        TestingExecutionPlanStore submittedExecutionPlanStore = TestingExecutionPlanStore.newBuilder().build();
        submittedExecutionPlanStore.start(null);
        this.haServices.setExecutionPlanStore(submittedExecutionPlanStore);
        this.dispatcher = this.createTestingDispatcherBuilder().setExecutionPlanWriter((ExecutionPlanWriter)submittedExecutionPlanStore).build(rpcService);
        this.dispatcher.start();
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        dispatcherGateway.submitJob((ExecutionPlan)this.jobGraph, TIMEOUT).get();
        Assertions.assertThat((Integer)this.dispatcher.getNumberJobs(TIMEOUT).get()).isOne();
        this.dispatcher.close();
        Assertions.assertThat((boolean)submittedExecutionPlanStore.contains(this.jobGraph.getJobID())).isTrue();
    }

    @Test
    public void testJobSuspensionWhenDispatcherIsTerminated() throws Exception {
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(this.jobId));
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        dispatcherGateway.submitJob((ExecutionPlan)this.jobGraph, TIMEOUT).get();
        CompletableFuture jobResultFuture = dispatcherGateway.requestJobResult(this.jobGraph.getJobID(), TIMEOUT);
        Assertions.assertThat((CompletableFuture)jobResultFuture).isNotDone();
        this.dispatcher.close();
        JobResult jobResult = (JobResult)jobResultFuture.get();
        Assertions.assertThat((Comparable)jobResult.getApplicationStatus()).isSameAs((Object)ApplicationStatus.UNKNOWN);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testJobStatusIsShownDuringTermination() throws Exception {
        JobID blockingId = new JobID();
        this.haServices.setJobMasterLeaderElection(blockingId, new TestingLeaderElection());
        JobManagerRunnerWithBlockingTerminationFactory jobManagerRunnerFactory = new JobManagerRunnerWithBlockingTerminationFactory(blockingId);
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, jobManagerRunnerFactory);
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        JobGraph blockedJobGraph = JobGraphTestUtils.singleNoOpJobGraph();
        blockedJobGraph.setJobID(blockingId);
        dispatcherGateway.submitJob((ExecutionPlan)this.jobGraph, TIMEOUT).get();
        dispatcherGateway.submitJob((ExecutionPlan)blockedJobGraph, TIMEOUT).get();
        CompletableFuture terminationFuture = this.dispatcher.closeAsync();
        try {
            CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> {
                JobStatus status = ((ExecutionGraphInfo)dispatcherGateway.requestExecutionGraphInfo(this.jobId, TIMEOUT).get()).getArchivedExecutionGraph().getState();
                return status == JobStatus.SUSPENDED;
            }), 5L);
        }
        finally {
            jobManagerRunnerFactory.unblockTermination();
            terminationFuture.get();
        }
    }

    @Test
    public void testShutDownClusterShouldCompleteShutDownFuture() throws Exception {
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, (JobManagerRunnerFactory)JobMasterServiceLeadershipRunnerFactory.INSTANCE);
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        dispatcherGateway.shutDownCluster().get();
        this.dispatcher.getShutDownFuture().get();
    }

    @Test
    public void testOnRemovedJobGraphDoesNotCleanUpHAFiles() throws Exception {
        CompletableFuture removeJobGraphFuture = new CompletableFuture();
        CompletableFuture releaseJobGraphFuture = new CompletableFuture();
        TestingExecutionPlanStore testingExecutionPlanStore = TestingExecutionPlanStore.newBuilder().setGlobalCleanupFunction((jobId, executor) -> {
            removeJobGraphFuture.complete(jobId);
            return FutureUtils.completedVoidFuture();
        }).setLocalCleanupFunction((jobId, executor) -> {
            releaseJobGraphFuture.complete(jobId);
            return FutureUtils.completedVoidFuture();
        }).build();
        testingExecutionPlanStore.start(null);
        this.dispatcher = this.createTestingDispatcherBuilder().setRecoveredJobs(Collections.singleton(this.jobGraph)).setExecutionPlanWriter((ExecutionPlanWriter)testingExecutionPlanStore).build(rpcService);
        this.dispatcher.start();
        CompletableFuture processFuture = this.dispatcher.onRemovedExecutionPlan(this.jobGraph.getJobID());
        processFuture.join();
        Assertions.assertThat((Comparable)((JobID)releaseJobGraphFuture.get())).isEqualTo((Object)this.jobGraph.getJobID());
        Assertions.assertThatThrownBy(() -> removeJobGraphFuture.get(10L, TimeUnit.MILLISECONDS)).isInstanceOf(TimeoutException.class);
    }

    @Test
    public void testInitializationTimestampForwardedToJobManagerRunner() throws Exception {
        ArrayBlockingQueue<Long> initializationTimestampQueue = new ArrayBlockingQueue<Long>(1);
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new InitializationTimestampCapturingJobManagerRunnerFactory(initializationTimestampQueue));
        this.jobMasterLeaderElection.isLeader(UUID.randomUUID());
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        dispatcherGateway.submitJob((ExecutionPlan)this.jobGraph, TIMEOUT).get();
        long initializationTimestamp = (Long)initializationTimestampQueue.take();
        Assertions.assertThat((long)initializationTimestamp).isGreaterThan(0L);
    }

    @Test
    public void testRequestMultipleJobDetails_returnsSuspendedJobs() throws Exception {
        QueuedJobManagerRunnerFactory blockingJobMaster = new QueuedJobManagerRunnerFactory(this.completedJobManagerRunnerWithJobStatus(JobStatus.SUSPENDED));
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, blockingJobMaster);
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        this.jobMasterLeaderElection.isLeader(UUID.randomUUID());
        dispatcherGateway.submitJob((ExecutionPlan)this.jobGraph, TIMEOUT).get();
        this.dispatcher.getJobTerminationFuture(this.jobId, TIMEOUT).get();
        DispatcherTest.assertOnlyContainsSingleJobWithState(JobStatus.SUSPENDED, (MultipleJobsDetails)dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get());
    }

    @Test
    public void testRequestMultipleJobDetails_returnsRunningOverSuspendedJob() throws Exception {
        QueuedJobManagerRunnerFactory blockingJobMaster = new QueuedJobManagerRunnerFactory(this.completedJobManagerRunnerWithJobStatus(JobStatus.SUSPENDED), this.runningJobManagerRunnerWithJobStatus(JobStatus.RUNNING));
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, blockingJobMaster);
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        this.jobMasterLeaderElection.isLeader(UUID.randomUUID());
        dispatcherGateway.submitJob((ExecutionPlan)this.jobGraph, TIMEOUT).get();
        dispatcherGateway.requestJobResult(this.jobId, TIMEOUT).get();
        dispatcherGateway.submitJob((ExecutionPlan)this.jobGraph, TIMEOUT).get();
        DispatcherTest.assertOnlyContainsSingleJobWithState(JobStatus.RUNNING, (MultipleJobsDetails)dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get());
    }

    @Test
    public void testRequestMultipleJobDetails_returnsFinishedOverSuspendedJob() throws Exception {
        QueuedJobManagerRunnerFactory blockingJobMaster = new QueuedJobManagerRunnerFactory(this.completedJobManagerRunnerWithJobStatus(JobStatus.SUSPENDED), this.completedJobManagerRunnerWithJobStatus(JobStatus.FINISHED));
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, blockingJobMaster);
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        this.jobMasterLeaderElection.isLeader(UUID.randomUUID());
        dispatcherGateway.submitJob((ExecutionPlan)this.jobGraph, TIMEOUT).get();
        dispatcherGateway.requestJobResult(this.jobId, TIMEOUT).get();
        dispatcherGateway.submitJob((ExecutionPlan)this.jobGraph, TIMEOUT).get();
        this.dispatcher.getJobTerminationFuture(this.jobId, TIMEOUT).get();
        DispatcherTest.assertOnlyContainsSingleJobWithState(JobStatus.FINISHED, (MultipleJobsDetails)dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get());
    }

    @Test
    public void testRequestMultipleJobDetails_isSerializable() throws Exception {
        QueuedJobManagerRunnerFactory blockingJobMaster = new QueuedJobManagerRunnerFactory(this.completedJobManagerRunnerWithJobStatus(JobStatus.SUSPENDED));
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, blockingJobMaster);
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        this.jobMasterLeaderElection.isLeader(UUID.randomUUID());
        dispatcherGateway.submitJob((ExecutionPlan)this.jobGraph, TIMEOUT).get();
        this.dispatcher.getJobTerminationFuture(this.jobId, TIMEOUT).get();
        MultipleJobsDetails multipleJobsDetails = (MultipleJobsDetails)dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get();
        InstantiationUtil.serializeObject((Object)multipleJobsDetails);
    }

    @Test
    public void testOverridingJobVertexParallelisms() throws Exception {
        JobVertex v1 = new JobVertex("v1");
        v1.setParallelism(1);
        JobVertex v2 = new JobVertex("v2");
        v2.setParallelism(2);
        JobVertex v3 = new JobVertex("v3");
        v3.setParallelism(3);
        this.jobGraph = new JobGraph(this.jobGraph.getJobID(), "job", new JobVertex[]{v1, v2, v3});
        this.configuration.set(PipelineOptions.PARALLELISM_OVERRIDES, (Object)ImmutableMap.of((Object)v1.getID().toHexString(), (Object)"10", (Object)v3.getID().toHexString(), (Object)"21", (Object)new JobVertexID().toHexString(), (Object)"23"));
        this.jobGraph.getJobConfiguration().set(PipelineOptions.PARALLELISM_OVERRIDES, (Object)ImmutableMap.of((Object)v3.getID().toHexString(), (Object)"42", (Object)new JobVertexID().toHexString(), (Object)"25"));
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(this.jobId));
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        Assert.assertEquals((long)this.jobGraph.findVertexByID(v1.getID()).getParallelism(), (long)1L);
        Assert.assertEquals((long)this.jobGraph.findVertexByID(v2.getID()).getParallelism(), (long)2L);
        Assert.assertEquals((long)this.jobGraph.findVertexByID(v3.getID()).getParallelism(), (long)3L);
        dispatcherGateway.submitJob((ExecutionPlan)this.jobGraph, TIMEOUT).get();
        Assert.assertEquals((long)this.jobGraph.findVertexByID(v1.getID()).getParallelism(), (long)10L);
        Assert.assertEquals((long)this.jobGraph.findVertexByID(v2.getID()).getParallelism(), (long)2L);
        Assert.assertEquals((long)this.jobGraph.findVertexByID(v3.getID()).getParallelism(), (long)42L);
    }

    private JobManagerRunner runningJobManagerRunnerWithJobStatus(JobStatus currentJobStatus) {
        Preconditions.checkArgument((!currentJobStatus.isTerminalState() ? 1 : 0) != 0);
        return TestingJobManagerRunner.newBuilder().setJobId(this.jobId).setJobDetailsFunction(() -> JobDetails.createDetailsForJob((AccessExecutionGraph)new ArchivedExecutionGraphBuilder().setJobID(this.jobId).setState(currentJobStatus).build())).build();
    }

    private JobManagerRunner completedJobManagerRunnerWithJobStatus(JobStatus finalJobStatus) {
        Preconditions.checkArgument((boolean)finalJobStatus.isTerminalState());
        return TestingJobManagerRunner.newBuilder().setJobId(this.jobId).setResultFuture(CompletableFuture.completedFuture(JobManagerRunnerResult.forSuccess((ExecutionGraphInfo)new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setJobID(this.jobId).setState(finalJobStatus).build())))).build();
    }

    private static void assertOnlyContainsSingleJobWithState(JobStatus expectedJobStatus, MultipleJobsDetails multipleJobsDetails) {
        Collection finishedJobDetails = multipleJobsDetails.getJobs();
        Assertions.assertThat((Collection)finishedJobDetails).hasSize(1);
        Assertions.assertThat((Comparable)((JobDetails)finishedJobDetails.iterator().next()).getStatus()).isEqualTo((Object)expectedJobStatus);
    }

    @Test
    public void testOnlyRecoveredJobsAreRetainedInTheBlobServer() throws Exception {
        JobID jobId1 = new JobID();
        JobID jobId2 = new JobID();
        byte[] fileContent = new byte[]{1, 2, 3, 4};
        BlobServer blobServer = this.getBlobServer();
        PermanentBlobKey blobKey1 = blobServer.putPermanent(jobId1, fileContent);
        PermanentBlobKey blobKey2 = blobServer.putPermanent(jobId2, fileContent);
        this.dispatcher = this.createTestingDispatcherBuilder().setRecoveredJobs(Collections.singleton(new JobGraph(jobId1, "foobar"))).build(rpcService);
        Assertions.assertThat((File)blobServer.getFile(jobId1, blobKey1)).hasBinaryContent(fileContent);
        Assertions.assertThatThrownBy(() -> blobServer.getFile(jobId2, blobKey2)).isInstanceOf(NoSuchFileException.class);
    }

    @Test
    public void testRetrieveJobResultAfterSubmissionOfFailedJob() throws Exception {
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(this.jobId));
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        JobID failedJobId = new JobID();
        String failedJobName = "test";
        CompletableFuture submitFuture = dispatcherGateway.submitFailedJob(failedJobId, "test", (Throwable)new RuntimeException("Test exception."));
        submitFuture.get();
        ArchivedExecutionGraph archivedExecutionGraph = (ArchivedExecutionGraph)dispatcherGateway.requestJob(failedJobId, TIMEOUT).get();
        Assertions.assertThat((Comparable)archivedExecutionGraph.getJobID()).isEqualTo((Object)failedJobId);
        Assertions.assertThat((String)archivedExecutionGraph.getJobName()).isEqualTo("test");
        Assertions.assertThat((Comparable)archivedExecutionGraph.getState()).isEqualTo((Object)JobStatus.FAILED);
        ((ObjectAssert)Assertions.assertThat((Object)archivedExecutionGraph.getFailureInfo()).isNotNull()).extracting(ErrorInfo::getException).extracting(e -> e.deserializeError(Thread.currentThread().getContextClassLoader())).satisfies(new ThrowingConsumer[]{exception -> ((AbstractThrowableAssert)Assertions.assertThat((Throwable)exception).isInstanceOf(RuntimeException.class)).hasMessage("Test exception.")});
    }

    @Test
    public void testInvalidResourceRequirementsUpdate() throws Exception {
        this.configuration.set(JobManagerOptions.SCHEDULER, (Object)JobManagerOptions.SchedulerType.Adaptive);
        AtomicReference jobGraphPersistedFutureRef = new AtomicReference();
        TestingExecutionPlanStore executionPlanStore = TestingExecutionPlanStore.newBuilder().setPutExecutionPlanConsumer((org.apache.flink.util.function.ThrowingConsumer<ExecutionPlan, ? extends Exception>)((org.apache.flink.util.function.ThrowingConsumer)jobGraph -> Optional.ofNullable((CompletableFuture)jobGraphPersistedFutureRef.get()).map(f -> f.complete(null)))).build();
        this.haServices.setExecutionPlanStore(executionPlanStore);
        executionPlanStore.start(null);
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, (JobManagerRunnerFactory)JobMasterServiceLeadershipRunnerFactory.INSTANCE);
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        this.jobMasterLeaderElection.isLeader(UUID.randomUUID());
        dispatcherGateway.submitJob((ExecutionPlan)this.jobGraph, TIMEOUT).get();
        DispatcherTest.awaitStatus(dispatcherGateway, this.jobId, JobStatus.CREATED);
        CompletableFuture jobGraphPersistedFuture = new CompletableFuture();
        jobGraphPersistedFutureRef.set(jobGraphPersistedFuture);
        FlinkAssertions.assertThatFuture((CompletableFuture)dispatcherGateway.updateJobResourceRequirements(this.jobId, JobResourceRequirements.empty())).eventuallyFailsWith(ExecutionException.class).withCauseInstanceOf(RestHandlerException.class);
        FlinkAssertions.assertThatFuture(jobGraphPersistedFuture).willNotCompleteWithin(Duration.ofMillis(5L));
        FlinkAssertions.assertThatFuture((CompletableFuture)dispatcherGateway.requestJobResourceRequirements(this.jobId)).eventuallySucceeds().isNotEqualTo((Object)JobResourceRequirements.empty());
    }

    @Test
    public void testPersistErrorHandling() throws Exception {
        this.configuration.set(JobManagerOptions.SCHEDULER, (Object)JobManagerOptions.SchedulerType.Adaptive);
        TestingExecutionPlanStore executionPlanStore = TestingExecutionPlanStore.newBuilder().setPutJobResourceRequirementsConsumer((BiConsumerWithException<ExecutionPlan, JobResourceRequirements, ? extends Exception>)((BiConsumerWithException)(i1, i2) -> {
            throw new RuntimeException("artificial persist failure");
        })).build();
        this.haServices.setExecutionPlanStore(executionPlanStore);
        executionPlanStore.start(null);
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, (JobManagerRunnerFactory)JobMasterServiceLeadershipRunnerFactory.INSTANCE);
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        this.jobMasterLeaderElection.isLeader(UUID.randomUUID());
        dispatcherGateway.submitJob((ExecutionPlan)this.jobGraph, TIMEOUT).get();
        DispatcherTest.awaitStatus(dispatcherGateway, this.jobId, JobStatus.CREATED);
        JobVertex vertex = (JobVertex)this.jobGraph.getVertices().iterator().next();
        JobResourceRequirements attemptedNewRequirements = JobResourceRequirements.newBuilder().setParallelismForJobVertex(vertex.getID(), 1, 32).build();
        ((ThrowableAssertAlternative)FlinkAssertions.assertThatFuture((CompletableFuture)dispatcherGateway.updateJobResourceRequirements(this.jobId, attemptedNewRequirements)).eventuallyFailsWith(ExecutionException.class).havingCause().isInstanceOf(RestHandlerException.class)).satisfies(new ThrowingConsumer[]{e -> Assertions.assertThat((Comparable)((RestHandlerException)e).getHttpResponseStatus()).isSameAs((Object)HttpResponseStatus.INTERNAL_SERVER_ERROR)});
        FlinkAssertions.assertThatFuture((CompletableFuture)dispatcherGateway.requestJobResourceRequirements(this.jobId)).eventuallySucceeds().isNotEqualTo((Object)attemptedNewRequirements);
    }

    @Test
    public void testJobResourceRequirementsCanBeOnlyUpdatedOnInitializedJobMasters() throws Exception {
        JobManagerRunnerWithBlockingJobMasterFactory blockingJobMaster = new JobManagerRunnerWithBlockingJobMasterFactory(this::withRequestJobResponse);
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, blockingJobMaster);
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        this.jobMasterLeaderElection.isLeader(UUID.randomUUID());
        FlinkAssertions.assertThatFuture((CompletableFuture)dispatcherGateway.updateJobResourceRequirements(this.jobId, JobResourceRequirements.empty())).eventuallyFailsWith(ExecutionException.class).withCauseInstanceOf(FlinkJobNotFoundException.class);
        dispatcherGateway.submitJob((ExecutionPlan)this.jobGraph, TIMEOUT).get();
        blockingJobMaster.waitForBlockingInit();
        try {
            FlinkAssertions.assertThatFuture((CompletableFuture)dispatcherGateway.updateJobResourceRequirements(this.jobId, JobResourceRequirements.empty())).eventuallyFailsWith(ExecutionException.class).withCauseInstanceOf(UnavailableDispatcherOperationException.class);
        }
        finally {
            blockingJobMaster.unblockJobMasterInitialization();
        }
        DispatcherTest.awaitStatus(dispatcherGateway, this.jobId, JobStatus.RUNNING);
        FlinkAssertions.assertThatFuture((CompletableFuture)dispatcherGateway.updateJobResourceRequirements(this.jobId, this.getJobRequirements())).eventuallySucceeds();
    }

    @Test
    public void testJobResourceRequirementsAreGuardedAgainstConcurrentModification() throws Exception {
        CompletableFuture<Acknowledge> blockedUpdatesToJobMasterFuture = new CompletableFuture<Acknowledge>();
        JobManagerRunnerWithBlockingJobMasterFactory blockingJobMaster = new JobManagerRunnerWithBlockingJobMasterFactory(builder -> this.withRequestJobResponse((TestingJobMasterGatewayBuilder)builder).setUpdateJobResourceRequirementsFunction(jobResourceRequirements -> blockedUpdatesToJobMasterFuture));
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, blockingJobMaster);
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        JobGraph firstJobGraph = (JobGraph)InstantiationUtil.clone((Serializable)this.jobGraph);
        firstJobGraph.setJobID(JobID.generate());
        JobGraph secondJobGraph = (JobGraph)InstantiationUtil.clone((Serializable)this.jobGraph);
        secondJobGraph.setJobID(JobID.generate());
        CompletableFuture<?> firstPendingUpdateFuture = this.testConcurrentModificationIsPrevented(dispatcherGateway, blockingJobMaster, firstJobGraph);
        CompletableFuture<?> secondPendingUpdateFuture = this.testConcurrentModificationIsPrevented(dispatcherGateway, blockingJobMaster, secondJobGraph);
        Assertions.assertThat(firstPendingUpdateFuture).isNotCompleted();
        Assertions.assertThat(secondPendingUpdateFuture).isNotCompleted();
        blockedUpdatesToJobMasterFuture.complete(Acknowledge.get());
        FlinkAssertions.assertThatFuture(firstPendingUpdateFuture).eventuallySucceeds();
        FlinkAssertions.assertThatFuture(secondPendingUpdateFuture).eventuallySucceeds();
    }

    private CompletableFuture<?> testConcurrentModificationIsPrevented(DispatcherGateway dispatcherGateway, JobManagerRunnerWithBlockingJobMasterFactory blockingJobMaster, JobGraph jobGraph) throws Exception {
        TestingLeaderElection jobMasterLeaderElection = new TestingLeaderElection();
        this.haServices.setJobMasterLeaderElection(jobGraph.getJobID(), jobMasterLeaderElection);
        jobMasterLeaderElection.isLeader(UUID.randomUUID());
        FlinkAssertions.assertThatFuture((CompletableFuture)dispatcherGateway.submitJob((ExecutionPlan)jobGraph, TIMEOUT)).eventuallySucceeds();
        blockingJobMaster.unblockJobMasterInitialization();
        DispatcherTest.awaitStatus(dispatcherGateway, jobGraph.getJobID(), JobStatus.RUNNING);
        CompletableFuture pendingUpdateFuture = dispatcherGateway.updateJobResourceRequirements(jobGraph.getJobID(), this.getJobRequirements());
        ((ThrowableAssertAlternative)FlinkAssertions.assertThatFuture((CompletableFuture)dispatcherGateway.updateJobResourceRequirements(jobGraph.getJobID(), this.getJobRequirements())).eventuallyFailsWith(ExecutionException.class).havingCause().isInstanceOf(RestHandlerException.class)).satisfies(new ThrowingConsumer[]{e -> Assertions.assertThat((Comparable)((RestHandlerException)e).getHttpResponseStatus()).isSameAs((Object)HttpResponseStatus.CONFLICT)});
        FlinkAssertions.assertThatFuture((CompletableFuture)pendingUpdateFuture).isNotCompleted();
        return pendingUpdateFuture;
    }

    @Test
    public void testJobResourceRequirementsCanBeUpdatedSequentially() throws Exception {
        JobManagerRunnerWithBlockingJobMasterFactory blockingJobMaster = new JobManagerRunnerWithBlockingJobMasterFactory(builder -> this.withRequestJobResponse((TestingJobMasterGatewayBuilder)builder).setUpdateJobResourceRequirementsFunction(jobResourceRequirements -> CompletableFuture.completedFuture(Acknowledge.get())));
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, blockingJobMaster);
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        this.jobMasterLeaderElection.isLeader(UUID.randomUUID());
        dispatcherGateway.submitJob((ExecutionPlan)this.jobGraph, TIMEOUT).get();
        blockingJobMaster.waitForBlockingInit();
        blockingJobMaster.unblockJobMasterInitialization();
        DispatcherTest.awaitStatus(dispatcherGateway, this.jobId, JobStatus.RUNNING);
        for (int x = 0; x < 2; ++x) {
            FlinkAssertions.assertThatFuture((CompletableFuture)dispatcherGateway.updateJobResourceRequirements(this.jobGraph.getJobID(), this.getJobRequirements())).eventuallySucceeds();
        }
    }

    private JobResourceRequirements getJobRequirements() {
        JobResourceRequirements.Builder builder = JobResourceRequirements.newBuilder();
        for (JobVertex vertex : this.jobGraph.getVertices()) {
            builder.setParallelismForJobVertex(vertex.getID(), 1, vertex.getParallelism());
        }
        return builder.build();
    }

    private TestingJobMasterGatewayBuilder withRequestJobResponse(TestingJobMasterGatewayBuilder builder) {
        return builder.setRequestJobSupplier(() -> CompletableFuture.completedFuture(new ExecutionGraphInfo(ArchivedExecutionGraph.createSparseArchivedExecutionGraphWithJobVertices((JobID)this.jobGraph.getJobID(), (String)this.jobGraph.getName(), (JobStatus)JobStatus.RUNNING, (JobType)JobType.STREAMING, null, null, (long)System.currentTimeMillis(), (Iterable)this.jobGraph.getVertices(), (VertexParallelismStore)SchedulerBase.computeVertexParallelismStore((JobGraph)this.jobGraph)))));
    }

    private static class FinishingJobManagerRunnerFactory
    implements JobManagerRunnerFactory {
        private final CompletableFuture<JobManagerRunnerResult> resultFuture;
        private final Runnable onClose;

        private FinishingJobManagerRunnerFactory(CompletableFuture<JobManagerRunnerResult> resultFuture, Runnable onClose) {
            this.resultFuture = resultFuture;
            this.onClose = onClose;
        }

        public JobManagerRunner createJobManagerRunner(ExecutionPlan graph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler, Collection<FailureEnricher> failureEnrichers, long initializationTimestamp) throws Exception {
            TestingJobManagerRunner runner = TestingJobManagerRunner.newBuilder().setJobId(graph.getJobID()).setResultFuture(this.resultFuture).build();
            runner.getTerminationFuture().thenRun(this.onClose::run);
            return runner;
        }
    }

    private static class QueuedJobManagerRunnerFactory
    implements JobManagerRunnerFactory {
        private final Queue<JobManagerRunner> resultFutureQueue;

        private QueuedJobManagerRunnerFactory(JobManagerRunner ... resultFutureQueue) {
            this.resultFutureQueue = new ArrayDeque<JobManagerRunner>(Arrays.asList(resultFutureQueue));
        }

        public JobManagerRunner createJobManagerRunner(ExecutionPlan graph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler, Collection<FailureEnricher> failureEnrichers, long initializationTimestamp) throws Exception {
            return this.resultFutureQueue.remove();
        }
    }

    private static final class ExpectedJobIdJobManagerRunnerFactory
    implements JobManagerRunnerFactory {
        private final JobID expectedJobId;

        private ExpectedJobIdJobManagerRunnerFactory(JobID expectedJobId) {
            this.expectedJobId = expectedJobId;
        }

        public JobManagerRunner createJobManagerRunner(ExecutionPlan graph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler, Collection<FailureEnricher> failureEnrichers, long initializationTimestamp) throws Exception {
            Assertions.assertThat((Comparable)graph.getJobID()).isEqualTo((Object)this.expectedJobId);
            return JobMasterServiceLeadershipRunnerFactory.INSTANCE.createJobManagerRunner(graph, configuration, rpcService, highAvailabilityServices, heartbeatServices, jobManagerSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler, Collections.emptySet(), initializationTimestamp);
        }
    }

    private static final class TestingJobMasterGatewayJobManagerRunnerFactory
    extends TestingJobMasterServiceLeadershipRunnerFactory {
        private final TestingJobMasterGateway testingJobMasterGateway;

        private TestingJobMasterGatewayJobManagerRunnerFactory(TestingJobMasterGateway testingJobMasterGateway) {
            this.testingJobMasterGateway = testingJobMasterGateway;
        }

        @Override
        public TestingJobManagerRunner createJobManagerRunner(ExecutionPlan executionPlan, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler, Collection<FailureEnricher> failureEnrichers, long initializationTimestamp) throws Exception {
            JobManagerRunner runner = super.createJobManagerRunner(executionPlan, configuration, rpcService, highAvailabilityServices, heartbeatServices, jobManagerServices, jobManagerJobMetricGroupFactory, fatalErrorHandler, (Collection)failureEnrichers, initializationTimestamp);
            runner.completeJobMasterGatewayFuture(this.testingJobMasterGateway);
            return runner;
        }
    }

    private static final class InitializationTimestampCapturingJobManagerRunnerFactory
    implements JobManagerRunnerFactory {
        private final BlockingQueue<Long> initializationTimestampQueue;

        private InitializationTimestampCapturingJobManagerRunnerFactory(BlockingQueue<Long> initializationTimestampQueue) {
            this.initializationTimestampQueue = initializationTimestampQueue;
        }

        public JobManagerRunner createJobManagerRunner(ExecutionPlan graph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler, Collection<FailureEnricher> failureEnrichers, long initializationTimestamp) {
            this.initializationTimestampQueue.offer(initializationTimestamp);
            return TestingJobManagerRunner.newBuilder().setJobId(graph.getJobID()).build();
        }
    }

    private static final class BlockingTerminationJobManagerService
    extends JobMasterServiceLeadershipRunner {
        private final JobID jobIdToBlock;
        private final CompletableFuture<Void> future;

        public BlockingTerminationJobManagerService(JobID jobIdToBlock, CompletableFuture<Void> future, JobMasterServiceProcessFactory jobMasterServiceProcessFactory, LeaderElection leaderElection, JobResultStore jobResultStore, LibraryCacheManager.ClassLoaderLease classLoaderLease, FatalErrorHandler fatalErrorHandler) {
            super(jobMasterServiceProcessFactory, leaderElection, jobResultStore, classLoaderLease, fatalErrorHandler);
            this.future = future;
            this.jobIdToBlock = jobIdToBlock;
        }

        public CompletableFuture<Void> closeAsync() {
            if (this.jobIdToBlock.equals((Object)this.getJobID())) {
                return this.future.whenComplete((r, t) -> super.closeAsync());
            }
            return super.closeAsync();
        }
    }

    private static final class JobManagerRunnerWithBlockingTerminationFactory
    implements JobManagerRunnerFactory {
        private final JobID jobIdToBlock;
        private final CompletableFuture<Void> future;

        public JobManagerRunnerWithBlockingTerminationFactory(JobID jobIdToBlock) {
            this.jobIdToBlock = jobIdToBlock;
            this.future = new CompletableFuture();
        }

        public JobManagerRunner createJobManagerRunner(ExecutionPlan executionPlan, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler, Collection<FailureEnricher> failureEnrichers, long initializationTimestamp) throws Exception {
            return new BlockingTerminationJobManagerService(this.jobIdToBlock, this.future, (JobMasterServiceProcessFactory)new DefaultJobMasterServiceProcessFactory(executionPlan.getJobID(), executionPlan.getName(), executionPlan.getJobType(), executionPlan.getCheckpointingSettings(), initializationTimestamp, (JobMasterServiceFactory)new TestingJobMasterServiceFactory()), highAvailabilityServices.getJobManagerLeaderElection(executionPlan.getJobID()), highAvailabilityServices.getJobResultStore(), jobManagerServices.getLibraryCacheManager().registerClassLoaderLease(executionPlan.getJobID()), fatalErrorHandler);
        }

        public void unblockTermination() {
            this.future.complete(null);
        }
    }

    private static class JobManagerRunnerWithBlockingJobMasterFactory
    implements JobManagerRunnerFactory {
        private final JobMasterGateway jobMasterGateway;
        private final AtomicReference<JobStatus> currentJobStatus = new AtomicReference<JobStatus>(JobStatus.INITIALIZING);
        private final BlockingQueue<CompletableFuture<JobMasterService>> jobMasterServiceFutures = new ArrayBlockingQueue<CompletableFuture<JobMasterService>>(2);
        private final OneShotLatch initLatch = new OneShotLatch();

        private JobManagerRunnerWithBlockingJobMasterFactory() {
            this(Function.identity());
        }

        private JobManagerRunnerWithBlockingJobMasterFactory(Function<TestingJobMasterGatewayBuilder, TestingJobMasterGatewayBuilder> modifyGatewayBuilder) {
            TestingJobMasterGatewayBuilder builder = new TestingJobMasterGatewayBuilder().setRequestJobSupplier(() -> CompletableFuture.completedFuture(new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setState(this.currentJobStatus.get()).build())));
            this.jobMasterGateway = modifyGatewayBuilder.apply(builder).build();
        }

        public JobManagerRunner createJobManagerRunner(ExecutionPlan executionPlan, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler, Collection<FailureEnricher> failureEnrichers, long initializationTimestamp) throws Exception {
            return new JobMasterServiceLeadershipRunner((JobMasterServiceProcessFactory)new DefaultJobMasterServiceProcessFactory(executionPlan.getJobID(), executionPlan.getName(), executionPlan.getJobType(), executionPlan.getCheckpointingSettings(), initializationTimestamp, (JobMasterServiceFactory)new TestingJobMasterServiceFactory(ignored -> {
                this.initLatch.trigger();
                CompletableFuture result = new CompletableFuture();
                Preconditions.checkState((boolean)this.jobMasterServiceFutures.offer(result));
                return result;
            })), highAvailabilityServices.getJobManagerLeaderElection(executionPlan.getJobID()), highAvailabilityServices.getJobResultStore(), jobManagerServices.getLibraryCacheManager().registerClassLoaderLease(executionPlan.getJobID()), fatalErrorHandler);
        }

        public void waitForBlockingInit() throws InterruptedException {
            this.initLatch.await();
        }

        public void unblockJobMasterInitialization() throws InterruptedException {
            CompletableFuture<JobMasterService> future = this.jobMasterServiceFutures.take();
            future.complete(new TestingJobMasterService(this.jobMasterGateway));
            this.currentJobStatus.set(JobStatus.RUNNING);
        }
    }

    private static class CancellableJobManagerRunnerWithInitializedJobFactory
    implements JobManagerRunnerFactory {
        private final JobID expectedJobId;
        private final AtomicReference<JobStatus> jobStatus = new AtomicReference<JobStatus>(JobStatus.INITIALIZING);
        private final CompletableFuture<Void> cancellationFuture = new CompletableFuture();

        private CancellableJobManagerRunnerWithInitializedJobFactory(JobID expectedJobId) {
            this.expectedJobId = expectedJobId;
        }

        public JobManagerRunner createJobManagerRunner(ExecutionPlan graph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler, Collection<FailureEnricher> failureEnricher, long initializationTimestamp) throws Exception {
            Assertions.assertThat((Comparable)graph.getJobID()).isEqualTo((Object)this.expectedJobId);
            TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().setRequestJobSupplier(() -> {
                ExecutionGraphInfo executionGraphInfo = new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setState(this.jobStatus.get()).build());
                return CompletableFuture.completedFuture(executionGraphInfo);
            }).setCancelFunction(() -> {
                this.jobStatus.set(JobStatus.CANCELLING);
                return this.cancellationFuture.thenApply(ignored -> {
                    this.jobStatus.set(JobStatus.CANCELED);
                    return Acknowledge.get();
                });
            }).build();
            TestingJobMasterServiceFactory jobMasterServiceFactory = new TestingJobMasterServiceFactory(onCompletionActions -> {
                TestingJobMasterService jobMasterService = new TestingJobMasterService(jobMasterGateway);
                this.cancellationFuture.thenRun(() -> onCompletionActions.jobReachedGloballyTerminalState(new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setJobID(graph.getJobID()).setState(JobStatus.CANCELED).build())));
                return CompletableFuture.completedFuture(jobMasterService);
            });
            return new JobMasterServiceLeadershipRunner((JobMasterServiceProcessFactory)new DefaultJobMasterServiceProcessFactory(graph.getJobID(), graph.getName(), graph.getJobType(), graph.getCheckpointingSettings(), initializationTimestamp, (JobMasterServiceFactory)jobMasterServiceFactory), highAvailabilityServices.getJobManagerLeaderElection(graph.getJobID()), highAvailabilityServices.getJobResultStore(), jobManagerServices.getLibraryCacheManager().registerClassLoaderLease(graph.getJobID()), fatalErrorHandler);
        }

        public void unblockCancellation() {
            this.cancellationFuture.complete(null);
        }
    }
}

