/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blocklist.BlockedNode;
import org.apache.flink.runtime.blocklist.BlocklistHandler;
import org.apache.flink.runtime.blocklist.DefaultBlocklistHandler;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.SimpleSlotContext;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.TestingJobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.AllocatedSlotInfo;
import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
import org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationRejection;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.JobMasterTestUtils;
import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.jobmaster.SlotPoolServiceSchedulerFactory;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.TaskManagerRegistrationInformation;
import org.apache.flink.runtime.jobmaster.TestUtils;
import org.apache.flink.runtime.jobmaster.TestingJobManagerSharedServicesBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolFactory;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SlotInfoWithUtilization;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolServiceFactory;
import org.apache.flink.runtime.jobmaster.slotpool.TestingSlotPoolServiceBuilder;
import org.apache.flink.runtime.jobmaster.utils.JobMasterBuilder;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
import org.apache.flink.runtime.scheduler.DefaultSchedulerFactory;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
import org.apache.flink.runtime.scheduler.TestingSchedulerNG;
import org.apache.flink.runtime.scheduler.TestingSchedulerNGFactory;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.QuadFunction;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.flink.util.function.TriFunction;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class JobMasterTest {
    private static final TestingInputSplit[] EMPTY_TESTING_INPUT_SPLITS = new TestingInputSplit[0];
    @TempDir
    private Path temporaryFolder;
    private static final Time testingTimeout = Time.seconds((long)10L);
    private static final long fastHeartbeatInterval = 1L;
    private static final long fastHeartbeatTimeout = 10L;
    private static final long heartbeatInterval = 1000L;
    private static final long heartbeatTimeout = 5000000L;
    private static final JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
    private static TestingRpcService rpcService;
    private static HeartbeatServices fastHeartbeatServices;
    private static HeartbeatServices heartbeatServices;
    private Configuration configuration;
    private ResourceID jmResourceId;
    private JobMasterId jobMasterId;
    private TestingHighAvailabilityServices haServices;
    private SettableLeaderRetrievalService rmLeaderRetrievalService;
    private TestingFatalErrorHandler testingFatalErrorHandler;

    JobMasterTest() {
    }

    @BeforeAll
    static void setupAll() {
        rpcService = new TestingRpcService();
        fastHeartbeatServices = new HeartbeatServices(1L, 10L, -1);
        heartbeatServices = new HeartbeatServices(1000L, 5000000L, 1);
    }

    @BeforeEach
    void setup() throws IOException {
        this.configuration = new Configuration();
        this.haServices = new TestingHighAvailabilityServices();
        this.jobMasterId = JobMasterId.generate();
        this.jmResourceId = ResourceID.generate();
        this.testingFatalErrorHandler = new TestingFatalErrorHandler();
        this.haServices.setCheckpointRecoveryFactory((CheckpointRecoveryFactory)new StandaloneCheckpointRecoveryFactory());
        this.rmLeaderRetrievalService = new SettableLeaderRetrievalService(null, null);
        this.haServices.setResourceManagerLeaderRetriever(this.rmLeaderRetrievalService);
        this.configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, Files.createTempDirectory(this.temporaryFolder, UUID.randomUUID().toString(), new FileAttribute[0]).toString());
    }

    @AfterEach
    void teardown() throws Exception {
        if (this.testingFatalErrorHandler != null) {
            this.testingFatalErrorHandler.rethrowError();
        }
        rpcService.clearGateways();
    }

    @AfterAll
    static void teardownAll() {
        if (rpcService != null) {
            rpcService.stopService();
            rpcService = null;
        }
    }

    @Test
    void testTaskManagerRegistrationTriggersHeartbeating() throws Exception {
        CompletableFuture heartbeatResourceIdFuture = new CompletableFuture();
        LocalUnresolvedTaskManagerLocation unresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setHeartbeatJobManagerFunction((taskManagerId, ignored) -> {
            heartbeatResourceIdFuture.complete(taskManagerId);
            return FutureUtils.completedVoidFuture();
        }).createTestingTaskExecutorGateway();
        rpcService.registerGateway(taskExecutorGateway.getAddress(), (RpcGateway)taskExecutorGateway);
        try (JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService).withResourceId(this.jmResourceId).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withHeartbeatServices(new HeartbeatServices(1L, 10000L)).createJobMaster();){
            jobMaster.start();
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            CompletableFuture registrationResponse = jobMasterGateway.registerTaskManager(jobGraph.getJobID(), TaskManagerRegistrationInformation.create((String)taskExecutorGateway.getAddress(), (UnresolvedTaskManagerLocation)unresolvedTaskManagerLocation, (UUID)TestingUtils.zeroUUID()), testingTimeout);
            registrationResponse.get();
            Assertions.assertThat(heartbeatResourceIdFuture.join()).satisfiesAnyOf(new ThrowingConsumer[]{resourceID -> Assertions.assertThat((Object)resourceID).isNull(), resourceID -> {
                ObjectAssert cfr_ignored_0 = (ObjectAssert)Assertions.assertThat((Object)resourceID).isEqualTo((Object)this.jmResourceId);
            }});
        }
    }

    @Test
    void testHeartbeatTimeoutWithTaskManager() throws Exception {
        this.runHeartbeatTest(new TestingTaskExecutorGatewayBuilder().setHeartbeatJobManagerFunction((taskManagerId, ignored) -> FutureUtils.completedVoidFuture()), fastHeartbeatServices);
    }

    private void runHeartbeatTest(TestingTaskExecutorGatewayBuilder testingTaskExecutorGatewayBuilder, HeartbeatServices heartbeatServices) throws Exception {
        CompletableFuture disconnectedJobManagerFuture = new CompletableFuture();
        LocalUnresolvedTaskManagerLocation unresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
        TestingTaskExecutorGateway taskExecutorGateway = testingTaskExecutorGatewayBuilder.setDisconnectJobManagerConsumer((jobId, throwable) -> disconnectedJobManagerFuture.complete(jobId)).createTestingTaskExecutorGateway();
        rpcService.registerGateway(taskExecutorGateway.getAddress(), (RpcGateway)taskExecutorGateway);
        try (JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService).withResourceId(this.jmResourceId).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withHeartbeatServices(heartbeatServices).createJobMaster();){
            jobMaster.start();
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            CompletableFuture registrationResponse = jobMasterGateway.registerTaskManager(jobGraph.getJobID(), TaskManagerRegistrationInformation.create((String)taskExecutorGateway.getAddress(), (UnresolvedTaskManagerLocation)unresolvedTaskManagerLocation, (UUID)TestingUtils.zeroUUID()), testingTimeout);
            registrationResponse.get();
            JobID disconnectedJobManager = (JobID)disconnectedJobManagerFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            Assertions.assertThat((Comparable)disconnectedJobManager).isEqualTo((Object)jobGraph.getJobID());
        }
    }

    @Test
    void testTaskManagerBecomesUnreachableTriggersDisconnect() throws Exception {
        this.runHeartbeatTest(new TestingTaskExecutorGatewayBuilder().setHeartbeatJobManagerFunction((taskManagerId, ignored) -> FutureUtils.completedExceptionally((Throwable)new RecipientUnreachableException("sender", "recipient", "test heartbeat target is unreachable"))), heartbeatServices);
    }

    @Test
    void testAllocatedSlotReportDoesNotContainStaleInformation() throws Exception {
        CompletableFuture assertionFuture = new CompletableFuture();
        LocalUnresolvedTaskManagerLocation unresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
        AtomicBoolean terminateHeartbeatVerification = new AtomicBoolean(false);
        OneShotLatch hasReceivedSlotOffers = new OneShotLatch();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setHeartbeatJobManagerFunction((taskManagerId, allocatedSlotReport) -> {
            try {
                if (hasReceivedSlotOffers.isTriggered()) {
                    Assertions.assertThat((Collection)allocatedSlotReport.getAllocatedSlotInfos()).hasSize(1);
                } else {
                    Assertions.assertThat((Collection)allocatedSlotReport.getAllocatedSlotInfos()).isEmpty();
                }
            }
            catch (AssertionError e) {
                assertionFuture.completeExceptionally((Throwable)((Object)e));
            }
            if (terminateHeartbeatVerification.get()) {
                assertionFuture.complete(null);
            }
            return FutureUtils.completedVoidFuture();
        }).createTestingTaskExecutorGateway();
        rpcService.registerGateway(taskExecutorGateway.getAddress(), (RpcGateway)taskExecutorGateway);
        JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
        try (JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService).withHeartbeatServices(new HeartbeatServices(5L, 1000L)).withSlotPoolServiceSchedulerFactory((SlotPoolServiceSchedulerFactory)DefaultSlotPoolServiceSchedulerFactory.create((SlotPoolServiceFactory)new TestingSlotPoolFactory(hasReceivedSlotOffers), (SchedulerNGFactory)new DefaultSchedulerFactory())).createJobMaster();){
            jobMaster.start();
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            CompletableFuture registrationResponse = jobMasterGateway.registerTaskManager(jobGraph.getJobID(), TaskManagerRegistrationInformation.create((String)taskExecutorGateway.getAddress(), (UnresolvedTaskManagerLocation)unresolvedTaskManagerLocation, (UUID)TestingUtils.zeroUUID()), testingTimeout);
            registrationResponse.get();
            SlotOffer slotOffer = new SlotOffer(new AllocationID(), 0, ResourceProfile.ANY);
            CompletableFuture slotOfferFuture = jobMasterGateway.offerSlots(unresolvedTaskManagerLocation.getResourceID(), Collections.singleton(slotOffer), testingTimeout);
            Assertions.assertThat((Collection)((Collection)slotOfferFuture.get())).containsExactly((Object[])new SlotOffer[]{slotOffer});
            terminateHeartbeatVerification.set(true);
            assertionFuture.get();
        }
    }

    @Test
    void testHeartbeatTimeoutWithResourceManager() throws Exception {
        String resourceManagerAddress = "rm";
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceID rmResourceId = new ResourceID("rm");
        TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(resourceManagerId, rmResourceId, "rm", "localhost");
        CompletableFuture jobManagerRegistrationFuture = new CompletableFuture();
        CompletableFuture disconnectedJobManagerFuture = new CompletableFuture();
        CountDownLatch registrationAttempts = new CountDownLatch(2);
        resourceManagerGateway.setRegisterJobManagerFunction((QuadFunction<JobMasterId, ResourceID, String, JobID, CompletableFuture<RegistrationResponse>>)((QuadFunction)(jobMasterId, resourceID, s, jobID) -> {
            jobManagerRegistrationFuture.complete(Tuple3.of((Object)jobMasterId, (Object)resourceID, (Object)jobID));
            registrationAttempts.countDown();
            return CompletableFuture.completedFuture(resourceManagerGateway.getJobMasterRegistrationSuccess());
        }));
        resourceManagerGateway.setDisconnectJobManagerConsumer(tuple -> disconnectedJobManagerFuture.complete(tuple.f0));
        rpcService.registerGateway("rm", (RpcGateway)resourceManagerGateway);
        try (JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService).withJobMasterId(this.jobMasterId).withResourceId(this.jmResourceId).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withHeartbeatServices(fastHeartbeatServices).createJobMaster();){
            jobMaster.start();
            this.rmLeaderRetrievalService.notifyListener("rm", resourceManagerId.toUUID());
            Tuple3 registrationInformation = (Tuple3)jobManagerRegistrationFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            Assertions.assertThat((Comparable)((Comparable)registrationInformation.f0)).isEqualTo((Object)this.jobMasterId);
            Assertions.assertThat((Object)registrationInformation.f1).isEqualTo((Object)this.jmResourceId);
            Assertions.assertThat((Comparable)((Comparable)registrationInformation.f2)).isEqualTo((Object)jobGraph.getJobID());
            JobID disconnectedJobManager = (JobID)disconnectedJobManagerFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            Assertions.assertThat((Comparable)disconnectedJobManager).isEqualTo((Object)jobGraph.getJobID());
            registrationAttempts.await();
        }
    }

    @Test
    void testResourceManagerBecomesUnreachableTriggersDisconnect() throws Exception {
        String resourceManagerAddress = "rm";
        ResourceManagerId resourceManagerId = ResourceManagerId.generate();
        ResourceID rmResourceId = new ResourceID("rm");
        TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(resourceManagerId, rmResourceId, "rm", "localhost");
        CompletableFuture disconnectedJobManagerFuture = new CompletableFuture();
        CountDownLatch registrationAttempts = new CountDownLatch(2);
        ArrayDeque connectionResponses = new ArrayDeque(2);
        connectionResponses.add(CompletableFuture.completedFuture(resourceManagerGateway.getJobMasterRegistrationSuccess()));
        connectionResponses.add(new CompletableFuture());
        resourceManagerGateway.setRegisterJobManagerFunction((QuadFunction<JobMasterId, ResourceID, String, JobID, CompletableFuture<RegistrationResponse>>)((QuadFunction)(jobMasterId, resourceID, s, jobID) -> {
            registrationAttempts.countDown();
            return (CompletableFuture)connectionResponses.poll();
        }));
        resourceManagerGateway.setDisconnectJobManagerConsumer(tuple -> disconnectedJobManagerFuture.complete(tuple.f0));
        resourceManagerGateway.setJobMasterHeartbeatFunction(ignored -> FutureUtils.completedExceptionally((Throwable)new RecipientUnreachableException("sender", "recipient", "resource manager is unreachable")));
        rpcService.registerGateway("rm", (RpcGateway)resourceManagerGateway);
        try (JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService).withJobMasterId(this.jobMasterId).withResourceId(this.jmResourceId).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withHeartbeatServices(heartbeatServices).createJobMaster();){
            jobMaster.start();
            this.rmLeaderRetrievalService.notifyListener("rm", resourceManagerId.toUUID());
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> {
                jobMasterGateway.heartbeatFromResourceManager(rmResourceId);
                return disconnectedJobManagerFuture.isDone();
            }), 50L);
            Assertions.assertThat((Comparable)((Comparable)disconnectedJobManagerFuture.join())).isEqualTo((Object)jobGraph.getJobID());
            registrationAttempts.await();
        }
    }

    @Test
    void testRestoringFromSavepoint() throws Exception {
        long savepointId = 42L;
        File savepointFile = this.createSavepoint(42L);
        SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath((String)savepointFile.getAbsolutePath(), (boolean)true);
        JobGraph jobGraph = this.createJobGraphWithCheckpointing(savepointRestoreSettings);
        StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
        CheckpointRecoveryFactory testingCheckpointRecoveryFactory = PerJobCheckpointRecoveryFactory.withoutCheckpointStoreRecovery(maxCheckpoints -> completedCheckpointStore);
        this.haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
        try (JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService).withHighAvailabilityServices(this.haServices).createJobMaster();){
            jobMaster.start();
            OneShotLatch taskSubmitLatch = new OneShotLatch();
            this.registerSlotsAtJobMaster(1, (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class), jobGraph.getJobID(), new TestingTaskExecutorGatewayBuilder().setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
                taskSubmitLatch.trigger();
                return CompletableFuture.completedFuture(Acknowledge.get());
            }).createTestingTaskExecutorGateway(), new LocalUnresolvedTaskManagerLocation());
            taskSubmitLatch.await();
            CompletedCheckpoint savepointCheckpoint = completedCheckpointStore.getLatestCheckpoint();
            Assertions.assertThat((Object)savepointCheckpoint).isNotNull();
            Assertions.assertThat((long)savepointCheckpoint.getCheckpointID()).isEqualTo(42L);
        }
    }

    @Test
    void testCheckpointPrecedesSavepointRecovery() throws Exception {
        long savepointId = 42L;
        File savepointFile = this.createSavepoint(42L);
        SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath((String)("" + savepointFile.getAbsolutePath()), (boolean)true);
        JobGraph jobGraph = this.createJobGraphWithCheckpointing(savepointRestoreSettings);
        long checkpointId = 1L;
        CompletedCheckpoint completedCheckpoint = new CompletedCheckpoint(jobGraph.getJobID(), 1L, 1L, 1L, Collections.emptyMap(), null, CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), (CompletedCheckpointStorageLocation)new DummyCheckpointStorageLocation(), null);
        StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
        completedCheckpointStore.addCheckpointAndSubsumeOldestOne(completedCheckpoint, new CheckpointsCleaner(), () -> {});
        CheckpointRecoveryFactory testingCheckpointRecoveryFactory = PerJobCheckpointRecoveryFactory.withoutCheckpointStoreRecovery(maxCheckpoints -> completedCheckpointStore);
        this.haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
        try (JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService).createJobMaster();){
            CompletedCheckpoint savepointCheckpoint = completedCheckpointStore.getLatestCheckpoint();
            Assertions.assertThat((Object)savepointCheckpoint).isNotNull();
            Assertions.assertThat((long)savepointCheckpoint.getCheckpointID()).isEqualTo(1L);
        }
    }

    @Test
    void testCloseUnestablishedResourceManagerConnection() throws Exception {
        try (JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).createJobMaster();){
            jobMaster.start();
            TestingResourceManagerGateway firstResourceManagerGateway = this.createAndRegisterTestingResourceManagerGateway();
            TestingResourceManagerGateway secondResourceManagerGateway = this.createAndRegisterTestingResourceManagerGateway();
            OneShotLatch firstJobManagerRegistration = new OneShotLatch();
            OneShotLatch secondJobManagerRegistration = new OneShotLatch();
            firstResourceManagerGateway.setRegisterJobManagerFunction((QuadFunction<JobMasterId, ResourceID, String, JobID, CompletableFuture<RegistrationResponse>>)((QuadFunction)(jobMasterId, resourceID, s, jobID) -> {
                firstJobManagerRegistration.trigger();
                return CompletableFuture.completedFuture(firstResourceManagerGateway.getJobMasterRegistrationSuccess());
            }));
            secondResourceManagerGateway.setRegisterJobManagerFunction((QuadFunction<JobMasterId, ResourceID, String, JobID, CompletableFuture<RegistrationResponse>>)((QuadFunction)(jobMasterId, resourceID, s, jobID) -> {
                secondJobManagerRegistration.trigger();
                return CompletableFuture.completedFuture(secondResourceManagerGateway.getJobMasterRegistrationSuccess());
            }));
            this.notifyResourceManagerLeaderListeners(firstResourceManagerGateway);
            firstJobManagerRegistration.await();
            this.notifyResourceManagerLeaderListeners(secondResourceManagerGateway);
            secondJobManagerRegistration.await();
        }
    }

    @Test
    void testReconnectionAfterDisconnect() throws Exception {
        try (JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService).withJobMasterId(this.jobMasterId).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withHeartbeatServices(heartbeatServices).createJobMaster();){
            jobMaster.start();
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            TestingResourceManagerGateway testingResourceManagerGateway = this.createAndRegisterTestingResourceManagerGateway();
            ArrayBlockingQueue registrationsQueue = new ArrayBlockingQueue(1);
            testingResourceManagerGateway.setRegisterJobManagerFunction((QuadFunction<JobMasterId, ResourceID, String, JobID, CompletableFuture<RegistrationResponse>>)((QuadFunction)(jobMasterId, resourceID, s, jobID) -> {
                registrationsQueue.offer(jobMasterId);
                return CompletableFuture.completedFuture(testingResourceManagerGateway.getJobMasterRegistrationSuccess());
            }));
            ResourceManagerId resourceManagerId = testingResourceManagerGateway.getFencingToken();
            this.notifyResourceManagerLeaderListeners(testingResourceManagerGateway);
            JobMasterId firstRegistrationAttempt = (JobMasterId)registrationsQueue.take();
            Assertions.assertThat((Comparable)firstRegistrationAttempt).isEqualTo((Object)this.jobMasterId);
            Assertions.assertThat(registrationsQueue).isEmpty();
            jobMasterGateway.disconnectResourceManager(resourceManagerId, (Exception)((Object)new FlinkException("Test exception")));
            Assertions.assertThat((Comparable)((Comparable)registrationsQueue.take())).isEqualTo((Object)this.jobMasterId);
        }
    }

    @Test
    void testResourceManagerConnectionAfterStart() throws Exception {
        try (JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService).withJobMasterId(this.jobMasterId).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withHeartbeatServices(heartbeatServices).createJobMaster();){
            TestingResourceManagerGateway testingResourceManagerGateway = this.createAndRegisterTestingResourceManagerGateway();
            ArrayBlockingQueue registrationQueue = new ArrayBlockingQueue(1);
            testingResourceManagerGateway.setRegisterJobManagerFunction((QuadFunction<JobMasterId, ResourceID, String, JobID, CompletableFuture<RegistrationResponse>>)((QuadFunction)(jobMasterId, resourceID, s, jobID) -> {
                registrationQueue.offer(jobMasterId);
                return CompletableFuture.completedFuture(testingResourceManagerGateway.getJobMasterRegistrationSuccess());
            }));
            this.notifyResourceManagerLeaderListeners(testingResourceManagerGateway);
            jobMaster.start();
            JobMasterId firstRegistrationAttempt = (JobMasterId)registrationQueue.take();
            Assertions.assertThat((Comparable)firstRegistrationAttempt).isEqualTo((Object)this.jobMasterId);
        }
    }

    @Test
    @Tag(value="org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler")
    void testRequestNextInputSplitWithLocalFailover() throws Exception {
        this.configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "region");
        Function<List<List<InputSplit>>, Collection<InputSplit>> expectFailedExecutionInputSplits = inputSplitsPerTask -> (List)inputSplitsPerTask.get(0);
        this.runRequestNextInputSplitTest(expectFailedExecutionInputSplits);
    }

    @Test
    void testRequestNextInputSplitWithGlobalFailover() throws Exception {
        this.configuration.setInteger(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
        this.configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, (Object)Duration.ofSeconds(0L));
        this.configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "full");
        Function<List<List<InputSplit>>, Collection<InputSplit>> expectAllRemainingInputSplits = this::flattenCollection;
        this.runRequestNextInputSplitTest(expectAllRemainingInputSplits);
    }

    private void runRequestNextInputSplitTest(Function<List<List<InputSplit>>, Collection<InputSplit>> expectedRemainingInputSplits) throws Exception {
        int parallelism = 2;
        int splitsPerTask = 2;
        int totalSplits = 4;
        ArrayList<TestingInputSplit> allInputSplits = new ArrayList<TestingInputSplit>(4);
        for (int i = 0; i < 4; ++i) {
            allInputSplits.add(new TestingInputSplit(i));
        }
        TestingInputSplitSource inputSplitSource = new TestingInputSplitSource(allInputSplits);
        JobVertex source = new JobVertex("source");
        source.setParallelism(2);
        source.setInputSplitSource((InputSplitSource)inputSplitSource);
        source.setInvokableClass(AbstractInvokable.class);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)100, (long)0L));
        JobGraph inputSplitJobGraph = JobGraphBuilder.newStreamingJobGraphBuilder().addJobVertex(source).setExecutionConfig(executionConfig).build();
        try (JobMaster jobMaster = new JobMasterBuilder(inputSplitJobGraph, rpcService).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withHeartbeatServices(heartbeatServices).createJobMaster();){
            jobMaster.start();
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            JobMasterTest.registerSlotsRequiredForJobExecution(jobMasterGateway, inputSplitJobGraph.getJobID(), 2);
            this.waitUntilAllExecutionsAreScheduledOrDeployed(jobMasterGateway);
            JobVertexID sourceId = source.getID();
            List<AccessExecution> executions = JobMasterTest.getExecutions(jobMasterGateway, sourceId);
            ExecutionAttemptID initialAttemptId = executions.get(0).getAttemptId();
            ArrayList<List<InputSplit>> inputSplitsPerTask = new ArrayList<List<InputSplit>>(2);
            for (AccessExecution execution : executions) {
                inputSplitsPerTask.add(JobMasterTest.getInputSplits(2, this.getInputSplitSupplier(sourceId, jobMasterGateway, execution.getAttemptId())));
            }
            List<InputSplit> allRequestedInputSplits = this.flattenCollection(inputSplitsPerTask);
            Assertions.assertThat(allRequestedInputSplits).containsExactlyInAnyOrder((Object[])allInputSplits.toArray(EMPTY_TESTING_INPUT_SPLITS));
            jobMasterGateway.updateTaskExecutionState(new TaskExecutionState(initialAttemptId, ExecutionState.FAILED)).get();
            this.waitUntilAllExecutionsAreScheduledOrDeployed(jobMasterGateway);
            ExecutionAttemptID restartedAttemptId = JobMasterTest.getFirstExecution(jobMasterGateway, sourceId).getAttemptId();
            List<InputSplit> inputSplits = this.getRemainingInputSplits(this.getInputSplitSupplier(sourceId, jobMasterGateway, restartedAttemptId));
            Assertions.assertThat(inputSplits).containsExactlyInAnyOrder((Object[])expectedRemainingInputSplits.apply(inputSplitsPerTask).toArray(EMPTY_TESTING_INPUT_SPLITS));
        }
    }

    @Nonnull
    private List<InputSplit> flattenCollection(List<List<InputSplit>> inputSplitsPerTask) {
        return inputSplitsPerTask.stream().flatMap(Collection::stream).collect(Collectors.toList());
    }

    @Nonnull
    private Supplier<SerializedInputSplit> getInputSplitSupplier(JobVertexID jobVertexID, JobMasterGateway jobMasterGateway, ExecutionAttemptID initialAttemptId) {
        return () -> JobMasterTest.getInputSplit(jobMasterGateway, jobVertexID, initialAttemptId);
    }

    private void waitUntilAllExecutionsAreScheduledOrDeployed(JobMasterGateway jobMasterGateway) throws Exception {
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> {
            Collection<AccessExecution> executions = JobMasterTest.getExecutions(jobMasterGateway);
            return !executions.isEmpty() && executions.stream().allMatch(execution -> execution.getState() == ExecutionState.SCHEDULED || execution.getState() == ExecutionState.DEPLOYING);
        }));
    }

    private static AccessExecution getFirstExecution(JobMasterGateway jobMasterGateway, JobVertexID jobVertexId) {
        List<AccessExecution> executions = JobMasterTest.getExecutions(jobMasterGateway, jobVertexId);
        Assertions.assertThat((int)executions.size()).isGreaterThanOrEqualTo(1);
        return executions.get(0);
    }

    private static Collection<AccessExecution> getExecutions(JobMasterGateway jobMasterGateway) {
        ArchivedExecutionGraph archivedExecutionGraph = JobMasterTest.requestExecutionGraph(jobMasterGateway).getArchivedExecutionGraph();
        return archivedExecutionGraph.getAllVertices().values().stream().flatMap(vertex -> Arrays.stream(vertex.getTaskVertices())).map(AccessExecutionVertex::getCurrentExecutionAttempt).collect(Collectors.toList());
    }

    private static List<AccessExecution> getExecutions(JobMasterGateway jobMasterGateway, JobVertexID jobVertexId) {
        ArchivedExecutionGraph archivedExecutionGraph = JobMasterTest.requestExecutionGraph(jobMasterGateway).getArchivedExecutionGraph();
        return Optional.ofNullable(archivedExecutionGraph.getAllVertices().get(jobVertexId)).map(accessExecutionJobVertex -> Arrays.asList(accessExecutionJobVertex.getTaskVertices())).orElse(Collections.emptyList()).stream().map(AccessExecutionVertex::getCurrentExecutionAttempt).collect(Collectors.toList());
    }

    private static ExecutionGraphInfo requestExecutionGraph(JobMasterGateway jobMasterGateway) {
        try {
            return (ExecutionGraphInfo)jobMasterGateway.requestJob(testingTimeout).get();
        }
        catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    @Nonnull
    private static List<InputSplit> getInputSplits(int numberInputSplits, Supplier<SerializedInputSplit> nextInputSplit) throws Exception {
        ArrayList<InputSplit> actualInputSplits = new ArrayList<InputSplit>(numberInputSplits);
        for (int i = 0; i < numberInputSplits; ++i) {
            SerializedInputSplit serializedInputSplit = nextInputSplit.get();
            Assertions.assertThat((boolean)serializedInputSplit.isEmpty()).isFalse();
            actualInputSplits.add((InputSplit)InstantiationUtil.deserializeObject((byte[])serializedInputSplit.getInputSplitData(), (ClassLoader)ClassLoader.getSystemClassLoader()));
        }
        return actualInputSplits;
    }

    private List<InputSplit> getRemainingInputSplits(Supplier<SerializedInputSplit> nextInputSplit) throws Exception {
        ArrayList<InputSplit> actualInputSplits = new ArrayList<InputSplit>(16);
        boolean hasMoreInputSplits = true;
        while (hasMoreInputSplits) {
            SerializedInputSplit serializedInputSplit = nextInputSplit.get();
            if (serializedInputSplit.isEmpty()) {
                hasMoreInputSplits = false;
                continue;
            }
            InputSplit inputSplit = (InputSplit)InstantiationUtil.deserializeObject((byte[])serializedInputSplit.getInputSplitData(), (ClassLoader)ClassLoader.getSystemClassLoader());
            if (inputSplit == null) {
                hasMoreInputSplits = false;
                continue;
            }
            actualInputSplits.add(inputSplit);
        }
        return actualInputSplits;
    }

    private static SerializedInputSplit getInputSplit(JobMasterGateway jobMasterGateway, JobVertexID jobVertexId, ExecutionAttemptID attemptId) {
        try {
            return (SerializedInputSplit)jobMasterGateway.requestNextInputSplit(jobVertexId, attemptId).get();
        }
        catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    @Test
    void testRequestPartitionState() throws Exception {
        JobGraph producerConsumerJobGraph = this.producerConsumerJobGraph();
        try (JobMaster jobMaster = new JobMasterBuilder(producerConsumerJobGraph, rpcService).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withHeartbeatServices(heartbeatServices).createJobMaster();){
            jobMaster.start();
            CompletableFuture tddFuture = new CompletableFuture();
            TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
                tddFuture.complete(taskDeploymentDescriptor);
                return CompletableFuture.completedFuture(Acknowledge.get());
            }).createTestingTaskExecutorGateway();
            LocalUnresolvedTaskManagerLocation taskManagerLocation = new LocalUnresolvedTaskManagerLocation();
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            Collection<SlotOffer> slotOffers = this.registerSlotsAtJobMaster(1, jobMasterGateway, producerConsumerJobGraph.getJobID(), testingTaskExecutorGateway, taskManagerLocation);
            Assertions.assertThat(slotOffers).hasSize(1);
            TaskDeploymentDescriptor tdd = (TaskDeploymentDescriptor)tddFuture.get();
            Assertions.assertThat((List)tdd.getProducedPartitions()).hasSize(1);
            ResultPartitionDeploymentDescriptor partition = (ResultPartitionDeploymentDescriptor)tdd.getProducedPartitions().iterator().next();
            ExecutionAttemptID executionAttemptId = tdd.getExecutionAttemptId();
            ExecutionAttemptID copiedExecutionAttemptId = (ExecutionAttemptID)InstantiationUtil.clone((Serializable)executionAttemptId);
            jobMasterGateway.updateTaskExecutionState(new TaskExecutionState(executionAttemptId, ExecutionState.FINISHED)).get();
            ResultPartitionID partitionId = new ResultPartitionID(partition.getPartitionId(), copiedExecutionAttemptId);
            CompletableFuture partitionStateFuture = jobMasterGateway.requestPartitionState(partition.getResultId(), partitionId);
            Assertions.assertThat((Comparable)((Comparable)partitionStateFuture.get())).isEqualTo((Object)ExecutionState.FINISHED);
            partitionStateFuture = jobMasterGateway.requestPartitionState(partition.getResultId(), new ResultPartitionID());
            Assertions.assertThatThrownBy(partitionStateFuture::get).hasRootCauseInstanceOf(IllegalArgumentException.class);
            partitionStateFuture = jobMasterGateway.requestPartitionState(new IntermediateDataSetID(), partitionId);
            Assertions.assertThatThrownBy(partitionStateFuture::get).hasRootCauseInstanceOf(IllegalArgumentException.class);
            partitionStateFuture = jobMasterGateway.requestPartitionState(partition.getResultId(), new ResultPartitionID(partition.getPartitionId(), ExecutionGraphTestUtils.createExecutionAttemptId()));
            Assertions.assertThatThrownBy(partitionStateFuture::get).hasRootCauseInstanceOf(PartitionProducerDisposedException.class);
        }
    }

    private void notifyResourceManagerLeaderListeners(TestingResourceManagerGateway testingResourceManagerGateway) {
        this.rmLeaderRetrievalService.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID());
    }

    @Test
    void testTriggerSavepointTimeout() throws Exception {
        TestingSchedulerNG testingSchedulerNG = TestingSchedulerNG.newBuilder().setTriggerSavepointFunction((TriFunction<String, Boolean, SavepointFormatType, CompletableFuture<String>>)((TriFunction)(ignoredA, ignoredB, formatType) -> new CompletableFuture())).build();
        try (JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService).withFatalErrorHandler(this.testingFatalErrorHandler).withSlotPoolServiceSchedulerFactory((SlotPoolServiceSchedulerFactory)DefaultSlotPoolServiceSchedulerFactory.create((SlotPoolServiceFactory)TestingSlotPoolServiceBuilder.newBuilder(), (SchedulerNGFactory)new TestingSchedulerNGFactory(testingSchedulerNG))).createJobMaster();){
            jobMaster.start();
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            CompletableFuture savepointFutureLowTimeout = jobMasterGateway.triggerSavepoint("/tmp", false, SavepointFormatType.CANONICAL, Time.milliseconds((long)1L));
            CompletableFuture savepointFutureHighTimeout = jobMasterGateway.triggerSavepoint("/tmp", false, SavepointFormatType.CANONICAL, RpcUtils.INF_TIMEOUT);
            Assertions.assertThatThrownBy(() -> {
                String cfr_ignored_0 = (String)savepointFutureLowTimeout.get(testingTimeout.getSize(), testingTimeout.getUnit());
            }).hasRootCauseInstanceOf(TimeoutException.class);
            Assertions.assertThat((CompletableFuture)savepointFutureHighTimeout).isNotDone();
        }
    }

    @Test
    void testReleasingTaskExecutorIfNoMoreSlotsRegistered() throws Exception {
        JobGraph jobGraph = this.createSingleVertexJobWithRestartStrategy();
        try (JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withHeartbeatServices(heartbeatServices).createJobMaster();){
            CompletableFuture disconnectTaskExecutorFuture = new CompletableFuture();
            CompletableFuture freedSlotFuture = new CompletableFuture();
            TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setFreeSlotFunction((allocationID, throwable) -> {
                freedSlotFuture.complete(allocationID);
                return CompletableFuture.completedFuture(Acknowledge.get());
            }).setDisconnectJobManagerConsumer((jobID, throwable) -> disconnectTaskExecutorFuture.complete(jobID)).createTestingTaskExecutorGateway();
            LocalUnresolvedTaskManagerLocation taskManagerLocation = new LocalUnresolvedTaskManagerLocation();
            jobMaster.start();
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            Collection<SlotOffer> slotOffers = this.registerSlotsAtJobMaster(1, jobMasterGateway, jobGraph.getJobID(), testingTaskExecutorGateway, taskManagerLocation);
            Assertions.assertThat(slotOffers).hasSize(1);
            AllocationID allocationId = slotOffers.iterator().next().getAllocationId();
            jobMasterGateway.failSlot(taskManagerLocation.getResourceID(), allocationId, (Exception)((Object)new FlinkException("Fail allocation test exception")));
            Assertions.assertThat((Comparable)((Comparable)freedSlotFuture.get())).isEqualTo((Object)allocationId);
            Assertions.assertThat((Comparable)((Comparable)disconnectTaskExecutorFuture.get())).isEqualTo((Object)jobGraph.getJobID());
        }
    }

    @Test
    void testTaskExecutorNotReleasedOnFailedAllocationIfPartitionIsAllocated() throws Exception {
        JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build();
        JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
        LocalUnresolvedTaskManagerLocation taskManagerUnresolvedLocation = new LocalUnresolvedTaskManagerLocation();
        AtomicBoolean isTrackingPartitions = new AtomicBoolean(true);
        TestingJobMasterPartitionTracker partitionTracker = new TestingJobMasterPartitionTracker();
        partitionTracker.setIsTrackingPartitionsForFunction(ignored -> isTrackingPartitions.get());
        try (JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withHeartbeatServices(heartbeatServices).withPartitionTrackerFactory(ignored -> partitionTracker).createJobMaster();){
            CompletableFuture disconnectTaskExecutorFuture = new CompletableFuture();
            CompletableFuture freedSlotFuture = new CompletableFuture();
            TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setFreeSlotFunction((allocationID, throwable) -> {
                freedSlotFuture.complete(allocationID);
                return CompletableFuture.completedFuture(Acknowledge.get());
            }).setDisconnectJobManagerConsumer((jobID, throwable) -> disconnectTaskExecutorFuture.complete(jobID)).createTestingTaskExecutorGateway();
            jobMaster.start();
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            Collection<SlotOffer> slotOffers = this.registerSlotsAtJobMaster(1, jobMasterGateway, jobGraph.getJobID(), testingTaskExecutorGateway, taskManagerUnresolvedLocation);
            Assertions.assertThat(slotOffers).hasSize(1);
            AllocationID allocationId = slotOffers.iterator().next().getAllocationId();
            jobMasterGateway.failSlot(taskManagerUnresolvedLocation.getResourceID(), allocationId, (Exception)((Object)new FlinkException("Fail allocation test exception")));
            Assertions.assertThat((Comparable)((Comparable)freedSlotFuture.get())).isEqualTo((Object)allocationId);
            jobMasterGateway.requestJobStatus(Time.seconds((long)5L)).get();
            Assertions.assertThat(disconnectTaskExecutorFuture).isNotDone();
        }
    }

    @Test
    void testJobMasterAggregatesValuesCorrectly() throws Exception {
        try (JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withHeartbeatServices(heartbeatServices).createJobMaster();){
            jobMaster.start();
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            AggregateFunction<Integer, Integer, Integer> aggregateFunction = this.createAggregateFunction();
            ClosureCleaner.clean(aggregateFunction, (ExecutionConfig.ClosureCleanerLevel)ExecutionConfig.ClosureCleanerLevel.RECURSIVE, (boolean)true);
            byte[] serializedAggregateFunction = InstantiationUtil.serializeObject(aggregateFunction);
            CompletableFuture updateAggregateFuture = jobMasterGateway.updateGlobalAggregate("agg1", (Object)1, serializedAggregateFunction);
            Assertions.assertThat(updateAggregateFuture.get()).isEqualTo((Object)1);
            updateAggregateFuture = jobMasterGateway.updateGlobalAggregate("agg1", (Object)2, serializedAggregateFunction);
            Assertions.assertThat(updateAggregateFuture.get()).isEqualTo((Object)3);
            updateAggregateFuture = jobMasterGateway.updateGlobalAggregate("agg1", (Object)3, serializedAggregateFunction);
            Assertions.assertThat(updateAggregateFuture.get()).isEqualTo((Object)6);
            updateAggregateFuture = jobMasterGateway.updateGlobalAggregate("agg1", (Object)4, serializedAggregateFunction);
            Assertions.assertThat(updateAggregateFuture.get()).isEqualTo((Object)10);
            updateAggregateFuture = jobMasterGateway.updateGlobalAggregate("agg2", (Object)10, serializedAggregateFunction);
            Assertions.assertThat(updateAggregateFuture.get()).isEqualTo((Object)10);
            updateAggregateFuture = jobMasterGateway.updateGlobalAggregate("agg2", (Object)23, serializedAggregateFunction);
            Assertions.assertThat(updateAggregateFuture.get()).isEqualTo((Object)33);
        }
    }

    private AggregateFunction<Integer, Integer, Integer> createAggregateFunction() {
        return new AggregateFunction<Integer, Integer, Integer>(){

            public Integer createAccumulator() {
                return 0;
            }

            public Integer add(Integer value, Integer accumulator) {
                return accumulator + value;
            }

            public Integer getResult(Integer accumulator) {
                return accumulator;
            }

            public Integer merge(Integer a, Integer b) {
                return this.add(a, b);
            }
        };
    }

    @Nonnull
    private TestingResourceManagerGateway createAndRegisterTestingResourceManagerGateway() {
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        rpcService.registerGateway(testingResourceManagerGateway.getAddress(), (RpcGateway)testingResourceManagerGateway);
        return testingResourceManagerGateway;
    }

    @Test
    void testJobFailureWhenGracefulTaskExecutorTermination() throws Exception {
        this.runJobFailureWhenTaskExecutorTerminatesTest(heartbeatServices, (localTaskManagerLocation, jobMasterGateway) -> jobMasterGateway.disconnectTaskManager(localTaskManagerLocation.getResourceID(), (Exception)((Object)new FlinkException("Test disconnectTaskManager exception."))));
    }

    @Test
    void testJobFailureWhenTaskExecutorHeartbeatTimeout() throws Exception {
        TestingHeartbeatServices testingHeartbeatService = new TestingHeartbeatServices(1000L, 5000000L);
        this.runJobFailureWhenTaskExecutorTerminatesTest(testingHeartbeatService, (localTaskManagerLocation, jobMasterGateway) -> testingHeartbeatService.triggerHeartbeatTimeout(this.jmResourceId, localTaskManagerLocation.getResourceID()));
    }

    @Test
    void testJobMasterRejectsTaskExecutorRegistrationIfJobIdsAreNotEqual() throws Exception {
        try (JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService).createJobMaster();){
            jobMaster.start();
            CompletableFuture registrationResponse = jobMaster.registerTaskManager(new JobID(), TaskManagerRegistrationInformation.create((String)"foobar", (UnresolvedTaskManagerLocation)new LocalUnresolvedTaskManagerLocation(), (UUID)TestingUtils.zeroUUID()), testingTimeout);
            Assertions.assertThat(registrationResponse.get()).isInstanceOf(JMTMRegistrationRejection.class);
        }
    }

    @Test
    void testJobMasterAcknowledgesDuplicateTaskExecutorRegistrations() throws Exception {
        try (JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService).createJobMaster();){
            TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
            rpcService.registerGateway(testingTaskExecutorGateway.getAddress(), (RpcGateway)testingTaskExecutorGateway);
            jobMaster.start();
            TaskManagerRegistrationInformation taskManagerRegistrationInformation = TaskManagerRegistrationInformation.create((String)testingTaskExecutorGateway.getAddress(), (UnresolvedTaskManagerLocation)new LocalUnresolvedTaskManagerLocation(), (UUID)UUID.randomUUID());
            CompletableFuture firstRegistrationResponse = jobMaster.registerTaskManager(jobGraph.getJobID(), taskManagerRegistrationInformation, testingTimeout);
            CompletableFuture secondRegistrationResponse = jobMaster.registerTaskManager(jobGraph.getJobID(), taskManagerRegistrationInformation, testingTimeout);
            Assertions.assertThat(firstRegistrationResponse.get()).isInstanceOf(JMTMRegistrationSuccess.class);
            Assertions.assertThat(secondRegistrationResponse.get()).isInstanceOf(JMTMRegistrationSuccess.class);
        }
    }

    @Test
    void testJobMasterDisconnectsOldTaskExecutorIfNewSessionIsSeen() throws Exception {
        try (JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService).createJobMaster();){
            CompletableFuture firstTaskExecutorDisconnectedFuture = new CompletableFuture();
            TestingTaskExecutorGateway firstTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setAddress("firstTaskExecutor").setDisconnectJobManagerConsumer((jobID, throwable) -> firstTaskExecutorDisconnectedFuture.complete(null)).createTestingTaskExecutorGateway();
            TestingTaskExecutorGateway secondTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setAddress("secondTaskExecutor").createTestingTaskExecutorGateway();
            rpcService.registerGateway(firstTaskExecutorGateway.getAddress(), (RpcGateway)firstTaskExecutorGateway);
            rpcService.registerGateway(secondTaskExecutorGateway.getAddress(), (RpcGateway)secondTaskExecutorGateway);
            jobMaster.start();
            LocalUnresolvedTaskManagerLocation taskManagerLocation = new LocalUnresolvedTaskManagerLocation();
            UUID firstTaskManagerSessionId = UUID.randomUUID();
            CompletableFuture firstRegistrationResponse = jobMaster.registerTaskManager(jobGraph.getJobID(), TaskManagerRegistrationInformation.create((String)firstTaskExecutorGateway.getAddress(), (UnresolvedTaskManagerLocation)taskManagerLocation, (UUID)firstTaskManagerSessionId), testingTimeout);
            Assertions.assertThat(firstRegistrationResponse.get()).isInstanceOf(JMTMRegistrationSuccess.class);
            UUID secondTaskManagerSessionId = UUID.randomUUID();
            CompletableFuture secondRegistrationResponse = jobMaster.registerTaskManager(jobGraph.getJobID(), TaskManagerRegistrationInformation.create((String)secondTaskExecutorGateway.getAddress(), (UnresolvedTaskManagerLocation)taskManagerLocation, (UUID)secondTaskManagerSessionId), testingTimeout);
            Assertions.assertThat(secondRegistrationResponse.get()).isInstanceOf(JMTMRegistrationSuccess.class);
            firstTaskExecutorDisconnectedFuture.get();
        }
    }

    @Test
    void testJobMasterOnlyTerminatesAfterTheSchedulerHasClosed() throws Exception {
        CompletableFuture<Object> schedulerTerminationFuture = new CompletableFuture<Object>();
        TestingSchedulerNG testingSchedulerNG = TestingSchedulerNG.newBuilder().setCloseAsyncSupplier(() -> schedulerTerminationFuture).build();
        try (JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService).withSlotPoolServiceSchedulerFactory((SlotPoolServiceSchedulerFactory)DefaultSlotPoolServiceSchedulerFactory.create((SlotPoolServiceFactory)TestingSlotPoolServiceBuilder.newBuilder(), (SchedulerNGFactory)new TestingSchedulerNGFactory(testingSchedulerNG))).createJobMaster();){
            jobMaster.start();
            CompletableFuture jobMasterTerminationFuture = jobMaster.closeAsync();
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> {
                Void cfr_ignored_0 = (Void)jobMasterTerminationFuture.get(10L, TimeUnit.MILLISECONDS);
            }).as("Expected TimeoutException because the JobMaster should not terminate.", new Object[0])).isInstanceOf(TimeoutException.class);
            schedulerTerminationFuture.complete(null);
            jobMasterTerminationFuture.get();
        }
    }

    @Test
    void testJobMasterAcceptsSlotsWhenJobIsRestarting() throws Exception {
        this.configuration.set(RestartStrategyOptions.RESTART_STRATEGY, (Object)"fixed-delay");
        this.configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, (Object)Duration.ofDays(1L));
        boolean numberSlots = true;
        try (JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService).withConfiguration(this.configuration).createJobMaster();){
            jobMaster.start();
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            LocalUnresolvedTaskManagerLocation unresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
            this.registerSlotsAtJobMaster(1, jobMasterGateway, jobGraph.getJobID(), new TestingTaskExecutorGatewayBuilder().setAddress("firstTaskManager").createTestingTaskExecutorGateway(), unresolvedTaskManagerLocation);
            CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> jobMasterGateway.requestJobStatus(testingTimeout).get() == JobStatus.RUNNING));
            jobMasterGateway.disconnectTaskManager(unresolvedTaskManagerLocation.getResourceID(), (Exception)((Object)new FlinkException("Test exception.")));
            CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> jobMasterGateway.requestJobStatus(testingTimeout).get() == JobStatus.RESTARTING));
            Assertions.assertThat(this.registerSlotsAtJobMaster(1, jobMasterGateway, jobGraph.getJobID(), new TestingTaskExecutorGatewayBuilder().setAddress("secondTaskManager").createTestingTaskExecutorGateway(), new LocalUnresolvedTaskManagerLocation())).hasSize(1);
        }
    }

    @Test
    void testBlockResourcesWillTriggerReleaseFreeSlots() throws Exception {
        JobVertex jobVertex = new JobVertex("jobVertex");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(2);
        JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex);
        LocalUnresolvedTaskManagerLocation taskManagerLocation = new LocalUnresolvedTaskManagerLocation();
        CompletableFuture freedSlotFuture1 = new CompletableFuture();
        CompletableFuture freedSlotFuture2 = new CompletableFuture();
        TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setFreeSlotFunction((allocationID, throwable) -> {
            if (!freedSlotFuture1.isDone()) {
                freedSlotFuture1.complete(allocationID);
            } else if (!freedSlotFuture2.isDone()) {
                freedSlotFuture2.complete(allocationID);
            }
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).createTestingTaskExecutorGateway();
        try (JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withHeartbeatServices(heartbeatServices).withBlocklistHandlerFactory((BlocklistHandler.Factory)new DefaultBlocklistHandler.Factory(Duration.ofMillis(100L))).createJobMaster();){
            jobMaster.start();
            JobMasterGateway jobMasterGateway = jobMaster.getGateway();
            Collection<SlotOffer> slotOffers = this.registerSlotsAtJobMaster(2, jobMasterGateway, jobGraph.getJobID(), testingTaskExecutorGateway, taskManagerLocation);
            Assertions.assertThat(slotOffers).hasSize(2);
            this.waitUntilAllExecutionsAreScheduledOrDeployed(jobMasterGateway);
            jobMasterGateway.updateTaskExecutionState(new TaskExecutionState(JobMasterTest.getExecutions(jobMasterGateway).iterator().next().getAttemptId(), ExecutionState.FINISHED)).get();
            BlockedNode blockedNode = new BlockedNode(taskManagerLocation.getNodeId(), "Test cause", System.currentTimeMillis());
            jobMasterGateway.notifyNewBlockedNodes(Collections.singleton(blockedNode)).get();
            Assertions.assertThat(freedSlotFuture1).isDone();
            Assertions.assertThat(freedSlotFuture2).isNotDone();
        }
    }

    @Test
    void testNewlyAddedBlockedNodesWillBeSynchronizedToResourceManager() throws Exception {
        try (JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService).withConfiguration(this.configuration).withHighAvailabilityServices(this.haServices).withBlocklistHandlerFactory((BlocklistHandler.Factory)new DefaultBlocklistHandler.Factory(Duration.ofMillis(100L))).createJobMaster();){
            jobMaster.start();
            JobMasterGateway jobMasterGateway = jobMaster.getGateway();
            CompletableFuture<Void> jobManagerRegistrationFutureA = new CompletableFuture<Void>();
            CompletableFuture<Void> jobManagerRegistrationFutureB = new CompletableFuture<Void>();
            CompletableFuture<Collection<BlockedNode>> firstReceivedBlockedNodeFutureOfA = new CompletableFuture<Collection<BlockedNode>>();
            CompletableFuture<Collection<BlockedNode>> secondReceivedBlockedNodeFutureOfA = new CompletableFuture<Collection<BlockedNode>>();
            CompletableFuture<Collection<BlockedNode>> firstReceivedBlockedNodeFutureOfB = new CompletableFuture<Collection<BlockedNode>>();
            TestingResourceManagerGateway resourceManagerGatewayA = this.createResourceManagerGateway(firstReceivedBlockedNodeFutureOfA, secondReceivedBlockedNodeFutureOfA, jobManagerRegistrationFutureA);
            TestingResourceManagerGateway resourceManagerGatewayB = this.createResourceManagerGateway(firstReceivedBlockedNodeFutureOfB, new CompletableFuture<Collection<BlockedNode>>(), jobManagerRegistrationFutureB);
            this.notifyResourceManagerLeaderListeners(resourceManagerGatewayA);
            jobManagerRegistrationFutureA.get();
            BlockedNode blockedNode1 = new BlockedNode("node1", "Test exception", Long.MAX_VALUE);
            jobMasterGateway.notifyNewBlockedNodes(Collections.singleton(blockedNode1)).get();
            Assertions.assertThat(firstReceivedBlockedNodeFutureOfA.get()).containsExactly((Object[])new BlockedNode[]{blockedNode1});
            this.notifyResourceManagerLeaderListeners(resourceManagerGatewayB);
            jobManagerRegistrationFutureB.get();
            BlockedNode blockedNode2 = new BlockedNode("node2", "Test exception", Long.MAX_VALUE);
            jobMasterGateway.notifyNewBlockedNodes(Collections.singleton(blockedNode2)).get();
            Assertions.assertThat(firstReceivedBlockedNodeFutureOfB.get()).containsExactlyInAnyOrder((Object[])new BlockedNode[]{blockedNode1, blockedNode2});
            Assertions.assertThat(secondReceivedBlockedNodeFutureOfA).isNotDone();
        }
    }

    private TestingResourceManagerGateway createResourceManagerGateway(CompletableFuture<Collection<BlockedNode>> firstReceivedBlockedNodeFuture, CompletableFuture<Collection<BlockedNode>> secondReceivedBlockedNodeFuture, CompletableFuture<Void> jobManagerRegistrationFuture) {
        TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
        rpcService.registerGateway(resourceManagerGateway.getAddress(), (RpcGateway)resourceManagerGateway);
        resourceManagerGateway.setNotifyNewBlockedNodesFunction(blockedNodes -> {
            if (!firstReceivedBlockedNodeFuture.isDone()) {
                firstReceivedBlockedNodeFuture.complete((Collection<BlockedNode>)blockedNodes);
            } else if (!secondReceivedBlockedNodeFuture.isDone()) {
                secondReceivedBlockedNodeFuture.complete((Collection<BlockedNode>)blockedNodes);
            }
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        resourceManagerGateway.setRegisterJobManagerFunction((QuadFunction<JobMasterId, ResourceID, String, JobID, CompletableFuture<RegistrationResponse>>)((QuadFunction)(ignoredA, ignoredB, ignoredC, ignoredD) -> {
            jobManagerRegistrationFuture.complete(null);
            return CompletableFuture.completedFuture(resourceManagerGateway.getJobMasterRegistrationSuccess());
        }));
        return resourceManagerGateway;
    }

    private void runJobFailureWhenTaskExecutorTerminatesTest(HeartbeatServices heartbeatServices, BiConsumer<LocalUnresolvedTaskManagerLocation, JobMasterGateway> jobReachedRunningState) throws Exception {
        JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
        JobMasterBuilder.TestingOnCompletionActions onCompletionActions = new JobMasterBuilder.TestingOnCompletionActions();
        try (JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService).withResourceId(this.jmResourceId).withHighAvailabilityServices(this.haServices).withHeartbeatServices(heartbeatServices).withOnCompletionActions(onCompletionActions).createJobMaster();){
            jobMaster.start();
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            LocalUnresolvedTaskManagerLocation taskManagerUnresolvedLocation = new LocalUnresolvedTaskManagerLocation();
            CompletableFuture taskDeploymentFuture = new CompletableFuture();
            TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
                taskDeploymentFuture.complete(taskDeploymentDescriptor.getExecutionAttemptId());
                return CompletableFuture.completedFuture(Acknowledge.get());
            }).createTestingTaskExecutorGateway();
            Collection<SlotOffer> slotOffers = this.registerSlotsAtJobMaster(1, jobMasterGateway, jobGraph.getJobID(), taskExecutorGateway, taskManagerUnresolvedLocation);
            Assertions.assertThat(slotOffers).hasSize(1);
            ExecutionAttemptID executionAttemptId = (ExecutionAttemptID)taskDeploymentFuture.get();
            jobMasterGateway.updateTaskExecutionState(new TaskExecutionState(executionAttemptId, ExecutionState.INITIALIZING)).get();
            jobMasterGateway.updateTaskExecutionState(new TaskExecutionState(executionAttemptId, ExecutionState.RUNNING)).get();
            jobReachedRunningState.accept(taskManagerUnresolvedLocation, jobMasterGateway);
            ArchivedExecutionGraph archivedExecutionGraph = onCompletionActions.getJobReachedGloballyTerminalStateFuture().get().getArchivedExecutionGraph();
            Assertions.assertThat((Comparable)archivedExecutionGraph.getState()).isEqualTo((Object)JobStatus.FAILED);
        }
    }

    private Collection<SlotOffer> registerSlotsAtJobMaster(int numberSlots, JobMasterGateway jobMasterGateway, JobID jobId, TaskExecutorGateway taskExecutorGateway, UnresolvedTaskManagerLocation unresolvedTaskManagerLocation) throws ExecutionException, InterruptedException {
        rpcService.registerGateway(taskExecutorGateway.getAddress(), (RpcGateway)taskExecutorGateway);
        jobMasterGateway.registerTaskManager(jobId, TaskManagerRegistrationInformation.create((String)taskExecutorGateway.getAddress(), (UnresolvedTaskManagerLocation)unresolvedTaskManagerLocation, (UUID)TestingUtils.zeroUUID()), testingTimeout).get();
        Collection slotOffers = IntStream.range(0, numberSlots).mapToObj(index -> new SlotOffer(new AllocationID(), index, ResourceProfile.ANY)).collect(Collectors.toList());
        return (Collection)jobMasterGateway.offerSlots(unresolvedTaskManagerLocation.getResourceID(), slotOffers, testingTimeout).get();
    }

    private JobGraph producerConsumerJobGraph() {
        JobVertex producer = new JobVertex("Producer");
        producer.setInvokableClass(NoOpInvokable.class);
        JobVertex consumer = new JobVertex("Consumer");
        consumer.setInvokableClass(NoOpInvokable.class);
        consumer.connectNewDataSetAsInput(producer, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        return JobGraphTestUtils.batchJobGraph(producer, consumer);
    }

    private File createSavepoint(long savepointId) throws IOException {
        return TestUtils.createSavepointWithOperatorState(Files.createTempFile(this.temporaryFolder, UUID.randomUUID().toString(), "", new FileAttribute[0]).toFile(), savepointId, new OperatorID[0]);
    }

    @Nonnull
    private JobGraph createJobGraphWithCheckpointing(SavepointRestoreSettings savepointRestoreSettings) {
        JobVertex source = new JobVertex("source");
        source.setInvokableClass(NoOpInvokable.class);
        source.setParallelism(1);
        return TestUtils.createJobGraphFromJobVerticesWithCheckpointing(savepointRestoreSettings, source);
    }

    private JobGraph createSingleVertexJobWithRestartStrategy() throws IOException {
        JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)Integer.MAX_VALUE, (long)0L));
        jobGraph.setExecutionConfig(executionConfig);
        return jobGraph;
    }

    private static void registerSlotsRequiredForJobExecution(JobMasterGateway jobMasterGateway, JobID jobId, int numSlots) throws ExecutionException, InterruptedException {
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setCancelTaskFunction(executionAttemptId -> {
            jobMasterGateway.updateTaskExecutionState(new TaskExecutionState(executionAttemptId, ExecutionState.CANCELED));
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).createTestingTaskExecutorGateway();
        JobMasterTestUtils.registerTaskExecutorAndOfferSlots(rpcService, jobMasterGateway, jobId, numSlots, taskExecutorGateway, testingTimeout);
    }

    private static final class DummyCheckpointStorageLocation
    implements CompletedCheckpointStorageLocation {
        private static final long serialVersionUID = 164095949572620688L;

        private DummyCheckpointStorageLocation() {
        }

        public String getExternalPointer() {
            return null;
        }

        public StreamStateHandle getMetadataHandle() {
            return null;
        }

        public void disposeStorageLocation() throws IOException {
        }
    }

    private static final class TestingInputSplit
    implements InputSplit {
        private static final long serialVersionUID = -5404803705463116083L;
        private final int splitNumber;

        TestingInputSplit(int number) {
            this.splitNumber = number;
        }

        public int getSplitNumber() {
            return this.splitNumber;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            TestingInputSplit that = (TestingInputSplit)o;
            return this.splitNumber == that.splitNumber;
        }

        public int hashCode() {
            return Objects.hash(this.splitNumber);
        }
    }

    private static final class TestingInputSplitSource
    implements InputSplitSource<TestingInputSplit> {
        private static final long serialVersionUID = -2344684048759139086L;
        private final List<TestingInputSplit> inputSplits;

        private TestingInputSplitSource(List<TestingInputSplit> inputSplits) {
            this.inputSplits = inputSplits;
        }

        public TestingInputSplit[] createInputSplits(int minNumSplits) {
            return this.inputSplits.toArray(EMPTY_TESTING_INPUT_SPLITS);
        }

        public InputSplitAssigner getInputSplitAssigner(TestingInputSplit[] inputSplits) {
            return new DefaultInputSplitAssigner((InputSplit[])inputSplits);
        }
    }

    private static final class TestingSlotPool
    implements SlotPool,
    SlotPoolService {
        private final JobID jobId;
        private final OneShotLatch hasReceivedSlotOffers;
        private final Map<ResourceID, Collection<SlotInfo>> registeredSlots;

        private TestingSlotPool(JobID jobId, OneShotLatch hasReceivedSlotOffers) {
            this.jobId = jobId;
            this.hasReceivedSlotOffers = hasReceivedSlotOffers;
            this.registeredSlots = new HashMap<ResourceID, Collection<SlotInfo>>(16);
        }

        public void start(JobMasterId jobMasterId, String newJobManagerAddress, ComponentMainThreadExecutor jmMainThreadScheduledExecutor) {
        }

        public void close() {
            this.clear();
        }

        private void clear() {
            this.registeredSlots.clear();
        }

        public void connectToResourceManager(ResourceManagerGateway resourceManagerGateway) {
            throw new UnsupportedOperationException("TestingSlotPool does not support this operation.");
        }

        public void disconnectResourceManager() {
            throw new UnsupportedOperationException("TestingSlotPool does not support this operation.");
        }

        public boolean registerTaskManager(ResourceID resourceID) {
            this.registeredSlots.computeIfAbsent(resourceID, ignored -> new ArrayList(16));
            return true;
        }

        public boolean releaseTaskManager(ResourceID resourceId, Exception cause) {
            this.registeredSlots.remove(resourceId);
            return true;
        }

        public void releaseFreeSlotsOnTaskManager(ResourceID taskManagerId, Exception cause) {
            throw new UnsupportedOperationException("TestingSlotPool does not support this operation.");
        }

        public Collection<SlotOffer> offerSlots(TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, Collection<SlotOffer> offers) {
            this.hasReceivedSlotOffers.trigger();
            Collection<SlotInfo> slotInfos = Optional.ofNullable(this.registeredSlots.get(taskManagerLocation.getResourceID())).orElseThrow(() -> new FlinkRuntimeException("TaskManager not registered."));
            int slotIndex = slotInfos.size();
            for (SlotOffer offer : offers) {
                slotInfos.add((SlotInfo)new SimpleSlotContext(offer.getAllocationId(), taskManagerLocation, slotIndex, taskManagerGateway));
                ++slotIndex;
            }
            return offers;
        }

        public Optional<ResourceID> failAllocation(@Nullable ResourceID resourceID, AllocationID allocationId, Exception cause) {
            throw new UnsupportedOperationException("TestingSlotPool does not support this operation.");
        }

        @Nonnull
        public Collection<SlotInfoWithUtilization> getAvailableSlotsInformation() {
            Collection allSlotInfos = this.registeredSlots.values().stream().flatMap(Collection::stream).map(slot -> SlotInfoWithUtilization.from((SlotInfo)slot, (double)0.0)).collect(Collectors.toList());
            return Collections.unmodifiableCollection(allSlotInfos);
        }

        public Collection<SlotInfo> getAllocatedSlotsInformation() {
            return Collections.emptyList();
        }

        public Optional<PhysicalSlot> allocateAvailableSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull AllocationID allocationID, @Nonnull ResourceProfile requirementProfile) {
            throw new UnsupportedOperationException("TestingSlotPool does not support this operation.");
        }

        @Nonnull
        public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile, @Nonnull Collection<AllocationID> preferredAllocations, @Nullable Time timeout) {
            return new CompletableFuture<PhysicalSlot>();
        }

        @Nonnull
        public CompletableFuture<PhysicalSlot> requestNewAllocatedBatchSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile, @Nonnull Collection<AllocationID> preferredAllocations) {
            return new CompletableFuture<PhysicalSlot>();
        }

        public void disableBatchSlotRequestTimeoutCheck() {
        }

        public AllocatedSlotReport createAllocatedSlotReport(ResourceID taskManagerId) {
            Collection slotInfos = this.registeredSlots.getOrDefault(taskManagerId, Collections.emptyList());
            List allocatedSlotInfos = slotInfos.stream().map(slotInfo -> new AllocatedSlotInfo(slotInfo.getPhysicalSlotNumber(), slotInfo.getAllocationId())).collect(Collectors.toList());
            return new AllocatedSlotReport(this.jobId, allocatedSlotInfos);
        }

        public void setIsJobRestarting(boolean isJobRestarting) {
        }

        public void releaseSlot(@Nonnull SlotRequestId slotRequestId, @Nullable Throwable cause) {
            throw new UnsupportedOperationException("TestingSlotPool does not support this operation.");
        }
    }

    private static final class TestingSlotPoolFactory
    implements SlotPoolServiceFactory {
        private final OneShotLatch hasReceivedSlotOffers;

        public TestingSlotPoolFactory(OneShotLatch hasReceivedSlotOffers) {
            this.hasReceivedSlotOffers = hasReceivedSlotOffers;
        }

        @Nonnull
        public SlotPoolService createSlotPoolService(@Nonnull JobID jobId, DeclarativeSlotPoolFactory declarativeSlotPoolFactory) {
            return new TestingSlotPool(jobId, this.hasReceivedSlotOffers);
        }
    }
}

