/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blocklist.BlockedNode;
import org.apache.flink.runtime.blocklist.BlocklistHandler;
import org.apache.flink.runtime.blocklist.DefaultBlocklistHandler;
import org.apache.flink.runtime.blocklist.NoOpBlocklistHandler;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.DefaultJobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.ResourceOverview;
import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration;
import org.apache.flink.runtime.resourcemanager.TaskManagerInfoWithSlots;
import org.apache.flink.runtime.resourcemanager.TestingJobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.TestingResourceManager;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManagerBuilder;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.TestingSlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.TestingSlotManagerBuilder;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
import org.apache.flink.runtime.security.token.DelegationTokenManager;
import org.apache.flink.runtime.security.token.NoOpDelegationTokenManager;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.slots.ResourceRequirements;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.assertj.core.api.AbstractComparableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class ResourceManagerTest {
    private static final Time TIMEOUT = Time.minutes((long)2L);
    private static final HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 10000L);
    private static final HeartbeatServices fastHeartbeatServices = new HeartbeatServices(1L, 1L);
    private static final HeartbeatServices failedRpcEnabledHeartbeatServices = new HeartbeatServices(1L, 10000000L, 1);
    private static final HardwareDescription hardwareDescription = new HardwareDescription(42, 1337L, 1337L, 0L);
    private static final int dataPort = 1234;
    private static final int jmxPort = 23456;
    private static TestingRpcService rpcService;
    private TestingHighAvailabilityServices highAvailabilityServices;
    private TestingFatalErrorHandler testingFatalErrorHandler;
    private ResourceID resourceManagerResourceId;
    private TestingResourceManager resourceManager;
    private ResourceManagerId resourceManagerId;

    ResourceManagerTest() {
    }

    @BeforeAll
    static void setupClass() {
        rpcService = new TestingRpcService();
    }

    @BeforeEach
    void setup() throws Exception {
        this.highAvailabilityServices = new TestingHighAvailabilityServices();
        this.highAvailabilityServices.setResourceManagerLeaderElectionService(new TestingLeaderElectionService());
        this.testingFatalErrorHandler = new TestingFatalErrorHandler();
        this.resourceManagerResourceId = ResourceID.generate();
    }

    @AfterEach
    void after() throws Exception {
        if (this.resourceManager != null) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{this.resourceManager});
        }
        if (this.highAvailabilityServices != null) {
            this.highAvailabilityServices.closeAndCleanupAllData();
        }
        if (this.testingFatalErrorHandler.hasExceptionOccurred()) {
            this.testingFatalErrorHandler.rethrowError();
        }
        if (rpcService != null) {
            rpcService.clearGateways();
        }
    }

    @AfterAll
    static void tearDownClass() throws Exception {
        if (rpcService != null) {
            RpcUtils.terminateRpcService((RpcService[])new RpcService[]{rpcService});
        }
    }

    @Test
    void testRequestTaskManagerInfo() throws Exception {
        ResourceID taskManagerId = ResourceID.generate();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setAddress(UUID.randomUUID().toString()).createTestingTaskExecutorGateway();
        rpcService.registerGateway(taskExecutorGateway.getAddress(), (RpcGateway)taskExecutorGateway);
        this.resourceManager = new ResourceManagerBuilder().buildAndStart();
        ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)this.resourceManager.getSelfGateway(ResourceManagerGateway.class);
        this.registerTaskExecutor(resourceManagerGateway, taskManagerId, taskExecutorGateway.getAddress());
        CompletableFuture taskManagerInfoFuture = resourceManagerGateway.requestTaskManagerDetailsInfo(taskManagerId, TestingUtils.TIMEOUT);
        TaskManagerInfoWithSlots taskManagerInfoWithSlots = (TaskManagerInfoWithSlots)taskManagerInfoFuture.get();
        TaskManagerInfo taskManagerInfo = taskManagerInfoWithSlots.getTaskManagerInfo();
        Assertions.assertThat((Object)taskManagerInfo.getResourceId()).isEqualTo((Object)taskManagerId);
        Assertions.assertThat((Object)taskManagerInfo.getHardwareDescription()).isEqualTo((Object)hardwareDescription);
        Assertions.assertThat((String)taskManagerInfo.getAddress()).isEqualTo(taskExecutorGateway.getAddress());
        Assertions.assertThat((int)taskManagerInfo.getDataPort()).isEqualTo(1234);
        Assertions.assertThat((int)taskManagerInfo.getJmxPort()).isEqualTo(23456);
        Assertions.assertThat((int)taskManagerInfo.getNumberSlots()).isEqualTo(0);
        Assertions.assertThat((int)taskManagerInfo.getNumberAvailableSlots()).isEqualTo(0);
        Assertions.assertThat((Collection)taskManagerInfoWithSlots.getAllocatedSlots()).isEmpty();
    }

    @Test
    void testRequestTaskExecutorGateway() throws Exception {
        ResourceID taskManagerId = ResourceID.generate();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setAddress(UUID.randomUUID().toString()).createTestingTaskExecutorGateway();
        rpcService.registerGateway(taskExecutorGateway.getAddress(), (RpcGateway)taskExecutorGateway);
        this.resourceManager = new ResourceManagerBuilder().buildAndStart();
        ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)this.resourceManager.getSelfGateway(ResourceManagerGateway.class);
        this.registerTaskExecutor(resourceManagerGateway, taskManagerId, taskExecutorGateway.getAddress());
        CompletableFuture taskExecutorGatewayFuture = resourceManagerGateway.requestTaskExecutorThreadInfoGateway(taskManagerId, TestingUtils.TIMEOUT);
        TaskExecutorThreadInfoGateway taskExecutorGatewayResult = (TaskExecutorThreadInfoGateway)taskExecutorGatewayFuture.get();
        Assertions.assertThat((Object)taskExecutorGatewayResult).isEqualTo((Object)taskExecutorGateway);
    }

    private void registerTaskExecutor(ResourceManagerGateway resourceManagerGateway, ResourceID taskExecutorId, String taskExecutorAddress) throws Exception {
        TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(taskExecutorAddress, taskExecutorId, 1234, 23456, hardwareDescription, new TaskExecutorMemoryConfiguration(Long.valueOf(1L), Long.valueOf(2L), Long.valueOf(3L), Long.valueOf(4L), Long.valueOf(5L), Long.valueOf(6L), Long.valueOf(7L), Long.valueOf(8L), Long.valueOf(9L), Long.valueOf(10L)), ResourceProfile.ZERO, ResourceProfile.ZERO, taskExecutorAddress);
        CompletableFuture registrationFuture = resourceManagerGateway.registerTaskExecutor(taskExecutorRegistration, TestingUtils.TIMEOUT);
        Assertions.assertThat(registrationFuture.get()).isInstanceOf(RegistrationResponse.Success.class);
    }

    @Test
    void testDisconnectJobManagerClearsRequirements() throws Exception {
        TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().setAddress(UUID.randomUUID().toString()).build();
        rpcService.registerGateway(jobMasterGateway.getAddress(), (RpcGateway)jobMasterGateway);
        TestingJobLeaderIdService jobLeaderIdService = TestingJobLeaderIdService.newBuilder().setGetLeaderIdFunction(jobId -> CompletableFuture.completedFuture(jobMasterGateway.getFencingToken())).build();
        CompletableFuture clearRequirementsFuture = new CompletableFuture();
        TestingSlotManager slotManager = new TestingSlotManagerBuilder().setClearRequirementsConsumer(clearRequirementsFuture::complete).createSlotManager();
        this.resourceManager = new ResourceManagerBuilder().withJobLeaderIdService(jobLeaderIdService).withSlotManager(slotManager).buildAndStart();
        JobID jobId2 = JobID.generate();
        ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)this.resourceManager.getSelfGateway(ResourceManagerGateway.class);
        resourceManagerGateway.registerJobMaster(jobMasterGateway.getFencingToken(), ResourceID.generate(), jobMasterGateway.getAddress(), jobId2, TIMEOUT).get();
        resourceManagerGateway.declareRequiredResources(jobMasterGateway.getFencingToken(), ResourceRequirements.create((JobID)jobId2, (String)jobMasterGateway.getAddress(), Collections.singleton(ResourceRequirement.create((ResourceProfile)ResourceProfile.UNKNOWN, (int)1))), TIMEOUT).get();
        resourceManagerGateway.disconnectJobManager(jobId2, JobStatus.FINISHED, (Exception)((Object)new FlinkException("Test exception")));
        Assertions.assertThat((Comparable)((Comparable)clearRequirementsFuture.get(5L, TimeUnit.SECONDS))).isEqualTo((Object)jobId2);
    }

    @Test
    void testProcessResourceRequirementsWhenRecoveryFinished() throws Exception {
        TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().setAddress(UUID.randomUUID().toString()).build();
        rpcService.registerGateway(jobMasterGateway.getAddress(), (RpcGateway)jobMasterGateway);
        TestingJobLeaderIdService jobLeaderIdService = TestingJobLeaderIdService.newBuilder().setGetLeaderIdFunction(jobId -> CompletableFuture.completedFuture(jobMasterGateway.getFencingToken())).build();
        CompletableFuture processRequirementsFuture = new CompletableFuture();
        CompletableFuture<Void> readyToServeFuture = new CompletableFuture<Void>();
        TestingSlotManager slotManager = new TestingSlotManagerBuilder().setProcessRequirementsConsumer(r -> processRequirementsFuture.complete(null)).createSlotManager();
        this.resourceManager = new ResourceManagerBuilder().withJobLeaderIdService(jobLeaderIdService).withSlotManager(slotManager).withReadyToServeFuture(readyToServeFuture).buildAndStart();
        JobID jobId2 = JobID.generate();
        ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)this.resourceManager.getSelfGateway(ResourceManagerGateway.class);
        resourceManagerGateway.registerJobMaster(jobMasterGateway.getFencingToken(), ResourceID.generate(), jobMasterGateway.getAddress(), jobId2, TIMEOUT).get();
        resourceManagerGateway.declareRequiredResources(jobMasterGateway.getFencingToken(), ResourceRequirements.create((JobID)jobId2, (String)jobMasterGateway.getAddress(), Collections.singleton(ResourceRequirement.create((ResourceProfile)ResourceProfile.UNKNOWN, (int)1))), TIMEOUT);
        this.resourceManager.runInMainThread(() -> {
            Assertions.assertThat((boolean)processRequirementsFuture.isDone()).isFalse();
            readyToServeFuture.complete(null);
            Assertions.assertThat((boolean)processRequirementsFuture.isDone()).isTrue();
            return null;
        }, TIMEOUT).get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
    }

    @Test
    void testHeartbeatTimeoutWithJobMaster() throws Exception {
        CompletableFuture heartbeatRequestFuture = new CompletableFuture();
        CompletableFuture disconnectFuture = new CompletableFuture();
        TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().setResourceManagerHeartbeatFunction(resourceId -> {
            heartbeatRequestFuture.complete(resourceId);
            return FutureUtils.completedVoidFuture();
        }).setDisconnectResourceManagerConsumer(disconnectFuture::complete).build();
        rpcService.registerGateway(jobMasterGateway.getAddress(), (RpcGateway)jobMasterGateway);
        JobID jobId = new JobID();
        ResourceID jobMasterResourceId = ResourceID.generate();
        SettableLeaderRetrievalService jobMasterLeaderRetrievalService = new SettableLeaderRetrievalService(jobMasterGateway.getAddress(), jobMasterGateway.getFencingToken().toUUID());
        this.highAvailabilityServices.setJobMasterLeaderRetrieverFunction(requestedJobId -> {
            Assertions.assertThat((Comparable)requestedJobId).isEqualTo((Object)jobId);
            return jobMasterLeaderRetrievalService;
        });
        this.runHeartbeatTimeoutTest(ignore -> {}, (org.apache.flink.util.function.ThrowingConsumer<ResourceManagerGateway, Exception>)((org.apache.flink.util.function.ThrowingConsumer)resourceManagerGateway -> {
            CompletableFuture registrationFuture = resourceManagerGateway.registerJobMaster(jobMasterGateway.getFencingToken(), jobMasterResourceId, jobMasterGateway.getAddress(), jobId, TIMEOUT);
            Assertions.assertThat(registrationFuture.get()).isInstanceOf(RegistrationResponse.Success.class);
        }), (org.apache.flink.util.function.ThrowingConsumer<ResourceID, Exception>)((org.apache.flink.util.function.ThrowingConsumer)resourceManagerResourceId -> {
            ResourceID optionalHeartbeatRequestOrigin = heartbeatRequestFuture.getNow(null);
            Assertions.assertThat((Object)optionalHeartbeatRequestOrigin).satisfiesAnyOf(new ThrowingConsumer[]{resourceID -> {
                ObjectAssert cfr_ignored_0 = (ObjectAssert)Assertions.assertThat((Object)resourceID).isEqualTo(resourceManagerResourceId);
            }, resourceID -> Assertions.assertThat((Object)resourceID).isNull()});
            Assertions.assertThat((Comparable)((Comparable)disconnectFuture.get())).isEqualTo((Object)this.resourceManagerId);
        }));
    }

    @Test
    void testJobMasterBecomesUnreachableTriggersDisconnect() throws Exception {
        JobID jobId = new JobID();
        ResourceID jobMasterResourceId = ResourceID.generate();
        CompletableFuture disconnectFuture = new CompletableFuture();
        TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().setAddress(UUID.randomUUID().toString()).setResourceManagerHeartbeatFunction(resourceId -> FutureUtils.completedExceptionally((Throwable)new RecipientUnreachableException("sender", "recipient", "task executor is unreachable"))).setDisconnectResourceManagerConsumer(disconnectFuture::complete).build();
        rpcService.registerGateway(jobMasterGateway.getAddress(), (RpcGateway)jobMasterGateway);
        SettableLeaderRetrievalService jobMasterLeaderRetrievalService = new SettableLeaderRetrievalService(jobMasterGateway.getAddress(), jobMasterGateway.getFencingToken().toUUID());
        this.highAvailabilityServices.setJobMasterLeaderRetrieverFunction(requestedJobId -> {
            Assertions.assertThat((Comparable)requestedJobId).isEqualTo((Object)jobId);
            return jobMasterLeaderRetrievalService;
        });
        this.runHeartbeatTargetBecomesUnreachableTest(ignore -> {}, (org.apache.flink.util.function.ThrowingConsumer<ResourceManagerGateway, Exception>)((org.apache.flink.util.function.ThrowingConsumer)resourceManagerGateway -> {
            CompletableFuture registrationFuture = resourceManagerGateway.registerJobMaster(jobMasterGateway.getFencingToken(), jobMasterResourceId, jobMasterGateway.getAddress(), jobId, TIMEOUT);
            Assertions.assertThat(registrationFuture.get()).isInstanceOf(RegistrationResponse.Success.class);
        }), (org.apache.flink.util.function.ThrowingConsumer<ResourceID, Exception>)((org.apache.flink.util.function.ThrowingConsumer)resourceManagerResourceId -> {
            AbstractComparableAssert cfr_ignored_0 = (AbstractComparableAssert)Assertions.assertThat((Comparable)((Comparable)disconnectFuture.get())).isEqualTo((Object)this.resourceManagerId);
        }));
    }

    @Test
    void testHeartbeatTimeoutWithTaskExecutor() throws Exception {
        ResourceID taskExecutorId = ResourceID.generate();
        CompletableFuture heartbeatRequestFuture = new CompletableFuture();
        CompletableFuture disconnectFuture = new CompletableFuture();
        CompletableFuture stopWorkerFuture = new CompletableFuture();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setDisconnectResourceManagerConsumer(disconnectFuture::complete).setHeartbeatResourceManagerFunction(resourceId -> {
            heartbeatRequestFuture.complete(resourceId);
            return FutureUtils.completedVoidFuture();
        }).createTestingTaskExecutorGateway();
        rpcService.registerGateway(taskExecutorGateway.getAddress(), (RpcGateway)taskExecutorGateway);
        this.runHeartbeatTimeoutTest(builder -> ((ResourceManagerBuilder)builder).withStopWorkerFunction(worker -> {
            stopWorkerFuture.complete(worker);
            return true;
        }), (org.apache.flink.util.function.ThrowingConsumer<ResourceManagerGateway, Exception>)((org.apache.flink.util.function.ThrowingConsumer)resourceManagerGateway -> this.registerTaskExecutor((ResourceManagerGateway)resourceManagerGateway, taskExecutorId, taskExecutorGateway.getAddress())), (org.apache.flink.util.function.ThrowingConsumer<ResourceID, Exception>)((org.apache.flink.util.function.ThrowingConsumer)resourceManagerResourceId -> {
            ResourceID optionalHeartbeatRequestOrigin = heartbeatRequestFuture.getNow(null);
            Assertions.assertThat((Object)optionalHeartbeatRequestOrigin).satisfiesAnyOf(new ThrowingConsumer[]{resourceID -> {
                ObjectAssert cfr_ignored_0 = (ObjectAssert)Assertions.assertThat((Object)resourceID).isEqualTo(resourceManagerResourceId);
            }, resourceID -> Assertions.assertThat((Object)resourceID).isNull()});
            Assertions.assertThat((Throwable)((Throwable)disconnectFuture.get())).isInstanceOf(TimeoutException.class);
            Assertions.assertThat(stopWorkerFuture.get()).isEqualTo((Object)taskExecutorId);
        }));
    }

    @Test
    void testTaskExecutorBecomesUnreachableTriggersDisconnect() throws Exception {
        ResourceID taskExecutorId = ResourceID.generate();
        CompletableFuture disconnectFuture = new CompletableFuture();
        CompletableFuture stopWorkerFuture = new CompletableFuture();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setAddress(UUID.randomUUID().toString()).setDisconnectResourceManagerConsumer(disconnectFuture::complete).setHeartbeatResourceManagerFunction(resourceId -> FutureUtils.completedExceptionally((Throwable)new RecipientUnreachableException("sender", "recipient", "task executor is unreachable"))).createTestingTaskExecutorGateway();
        rpcService.registerGateway(taskExecutorGateway.getAddress(), (RpcGateway)taskExecutorGateway);
        this.runHeartbeatTargetBecomesUnreachableTest(builder -> ((ResourceManagerBuilder)builder).withStopWorkerFunction(worker -> {
            stopWorkerFuture.complete(worker);
            return true;
        }), (org.apache.flink.util.function.ThrowingConsumer<ResourceManagerGateway, Exception>)((org.apache.flink.util.function.ThrowingConsumer)resourceManagerGateway -> this.registerTaskExecutor((ResourceManagerGateway)resourceManagerGateway, taskExecutorId, taskExecutorGateway.getAddress())), (org.apache.flink.util.function.ThrowingConsumer<ResourceID, Exception>)((org.apache.flink.util.function.ThrowingConsumer)resourceManagerResourceId -> {
            Assertions.assertThat((Throwable)((Throwable)disconnectFuture.get())).isInstanceOf(ResourceManagerException.class);
            Assertions.assertThat(stopWorkerFuture.get()).isEqualTo((Object)taskExecutorId);
        }));
    }

    @Test
    void testDisconnectJobManagerWithTerminalStatusShouldRemoveJob() throws Exception {
        this.testDisconnectJobManager(JobStatus.CANCELED);
    }

    @Test
    void testDisconnectJobManagerWithNonTerminalStatusShouldNotRemoveJob() throws Exception {
        this.testDisconnectJobManager(JobStatus.FAILING);
    }

    @Test
    void testDisconnectTaskManager() throws Exception {
        ResourceID taskExecutorId = ResourceID.generate();
        CompletableFuture disconnectFuture = new CompletableFuture();
        CompletableFuture stopWorkerFuture = new CompletableFuture();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setDisconnectResourceManagerConsumer(disconnectFuture::complete).createTestingTaskExecutorGateway();
        rpcService.registerGateway(taskExecutorGateway.getAddress(), (RpcGateway)taskExecutorGateway);
        this.resourceManager = new ResourceManagerBuilder().withStopWorkerFunction(stopWorkerFuture::complete).buildAndStart();
        this.registerTaskExecutor((ResourceManagerGateway)this.resourceManager, taskExecutorId, taskExecutorGateway.getAddress());
        this.resourceManager.disconnectTaskManager(taskExecutorId, (Exception)((Object)new FlinkException("Test exception")));
        Assertions.assertThat((Throwable)((Throwable)disconnectFuture.get())).isInstanceOf(FlinkException.class);
        Assertions.assertThat(stopWorkerFuture.get()).isEqualTo((Object)taskExecutorId);
    }

    @Test
    void testUnblockResourcesWillTriggerResourceRequirementsCheck() throws Exception {
        CompletableFuture triggerRequirementsCheckFuture = new CompletableFuture();
        TestingSlotManager slotManager = new TestingSlotManagerBuilder().setTriggerRequirementsCheckConsumer(triggerRequirementsCheckFuture::complete).createSlotManager();
        this.resourceManager = new ResourceManagerBuilder().withSlotManager(slotManager).withBlocklistHandlerFactory((BlocklistHandler.Factory)new DefaultBlocklistHandler.Factory(Duration.ofMillis(100L))).buildAndStart();
        ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)this.resourceManager.getSelfGateway(ResourceManagerGateway.class);
        resourceManagerGateway.notifyNewBlockedNodes(Collections.singleton(new BlockedNode("node", "Test cause", System.currentTimeMillis())));
        triggerRequirementsCheckFuture.get();
    }

    @Test
    void testNewlyAddedBlockedNodesWillBeSynchronizedToAllRegisteredJobMasters() throws Exception {
        JobID jobId1 = JobID.generate();
        JobID jobId2 = JobID.generate();
        ArrayList<BlockedNode> receivedBlockedNodes1 = new ArrayList<BlockedNode>();
        ArrayList<BlockedNode> receivedBlockedNodes2 = new ArrayList<BlockedNode>();
        JobMasterGateway jobMasterGateway1 = this.createJobMasterGateway(receivedBlockedNodes1);
        JobMasterGateway jobMasterGateway2 = this.createJobMasterGateway(receivedBlockedNodes2);
        TestingJobLeaderIdService jobLeaderIdService = TestingJobLeaderIdService.newBuilder().setGetLeaderIdFunction(jobId -> {
            JobMasterGateway leader;
            if (jobId.equals((Object)jobId1)) {
                leader = jobMasterGateway1;
            } else if (jobId.equals((Object)jobId2)) {
                leader = jobMasterGateway2;
            } else {
                throw new IllegalArgumentException("Unknown job");
            }
            return CompletableFuture.completedFuture(leader.getFencingToken());
        }).build();
        this.resourceManager = new ResourceManagerBuilder().withJobLeaderIdService(jobLeaderIdService).withBlocklistHandlerFactory((BlocklistHandler.Factory)new DefaultBlocklistHandler.Factory(Duration.ofMillis(100L))).buildAndStart();
        ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)this.resourceManager.getSelfGateway(ResourceManagerGateway.class);
        ResourceManagerTest.registerJobMasterToResourceManager(resourceManagerGateway, jobMasterGateway1, jobId1);
        ResourceManagerTest.registerJobMasterToResourceManager(resourceManagerGateway, jobMasterGateway2, jobId2);
        BlockedNode blockedNode1 = new BlockedNode("node1", "Test exception", Long.MAX_VALUE);
        resourceManagerGateway.notifyNewBlockedNodes(Collections.singleton(blockedNode1)).get();
        Assertions.assertThat(receivedBlockedNodes1).containsExactly((Object[])new BlockedNode[]{blockedNode1});
        Assertions.assertThat(receivedBlockedNodes2).containsExactly((Object[])new BlockedNode[]{blockedNode1});
        resourceManagerGateway.disconnectJobManager(jobId1, JobStatus.FINISHED, (Exception)((Object)new FlinkException("Test exception")));
        BlockedNode blockedNode2 = new BlockedNode("node2", "Test exception", Long.MAX_VALUE);
        resourceManagerGateway.notifyNewBlockedNodes(Collections.singleton(blockedNode2)).get();
        Assertions.assertThat(receivedBlockedNodes1).containsExactly((Object[])new BlockedNode[]{blockedNode1});
        Assertions.assertThat(receivedBlockedNodes2).containsExactlyInAnyOrder((Object[])new BlockedNode[]{blockedNode1, blockedNode2});
    }

    @Test
    void testResourceOverviewWithBlockedSlots() throws Exception {
        ManuallyTriggeredScheduledExecutor executor = new ManuallyTriggeredScheduledExecutor();
        DeclarativeSlotManager slotManager = DeclarativeSlotManagerBuilder.newBuilder((ScheduledExecutor)executor).build();
        this.resourceManager = new ResourceManagerBuilder().withSlotManager((SlotManager)slotManager).withBlocklistHandlerFactory((BlocklistHandler.Factory)new DefaultBlocklistHandler.Factory(Duration.ofMillis(100L))).buildAndStart();
        ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)this.resourceManager.getSelfGateway(ResourceManagerGateway.class);
        ResourceID taskExecutor = ResourceID.generate();
        ResourceID taskExecutorToBlock = ResourceID.generate();
        this.registerTaskExecutorAndSlot(resourceManagerGateway, taskExecutor, 3);
        this.registerTaskExecutorAndSlot(resourceManagerGateway, taskExecutorToBlock, 5);
        executor.triggerAll();
        ResourceOverview overview = (ResourceOverview)resourceManagerGateway.requestResourceOverview(Time.seconds((long)5L)).get();
        Assertions.assertThat((int)overview.getNumberTaskManagers()).isEqualTo(2);
        Assertions.assertThat((int)overview.getNumberRegisteredSlots()).isEqualTo(8);
        Assertions.assertThat((int)overview.getNumberFreeSlots()).isEqualTo(8);
        Assertions.assertThat((int)overview.getNumberBlockedTaskManagers()).isEqualTo(0);
        Assertions.assertThat((int)overview.getNumberBlockedFreeSlots()).isEqualTo(0);
        Assertions.assertThat((Object)overview.getTotalResource()).isEqualTo((Object)ResourceProfile.fromResources((double)1.0, (int)1024).multiply(8));
        Assertions.assertThat((Object)overview.getFreeResource()).isEqualTo((Object)ResourceProfile.fromResources((double)1.0, (int)1024).multiply(8));
        resourceManagerGateway.notifyNewBlockedNodes(Collections.singleton(new BlockedNode(this.resourceManager.getNodeIdOfTaskManager(taskExecutorToBlock), "Test cause", Long.MAX_VALUE)));
        ResourceOverview overviewBlocked = (ResourceOverview)resourceManagerGateway.requestResourceOverview(Time.seconds((long)5L)).get();
        Assertions.assertThat((int)overviewBlocked.getNumberTaskManagers()).isEqualTo(2);
        Assertions.assertThat((int)overviewBlocked.getNumberRegisteredSlots()).isEqualTo(8);
        Assertions.assertThat((int)overviewBlocked.getNumberFreeSlots()).isEqualTo(3);
        Assertions.assertThat((int)overviewBlocked.getNumberBlockedTaskManagers()).isEqualTo(1);
        Assertions.assertThat((int)overviewBlocked.getNumberBlockedFreeSlots()).isEqualTo(5);
        Assertions.assertThat((Object)overviewBlocked.getTotalResource()).isEqualTo((Object)ResourceProfile.fromResources((double)1.0, (int)1024).multiply(8));
        Assertions.assertThat((Object)overviewBlocked.getFreeResource()).isEqualTo((Object)ResourceProfile.fromResources((double)1.0, (int)1024).multiply(3));
    }

    private void registerTaskExecutorAndSlot(ResourceManagerGateway resourceManagerGateway, ResourceID taskManagerId, int slotCount) throws Exception {
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setAddress(UUID.randomUUID().toString()).createTestingTaskExecutorGateway();
        rpcService.registerGateway(taskExecutorGateway.getAddress(), (RpcGateway)taskExecutorGateway);
        TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(taskExecutorGateway.getAddress(), taskManagerId, 1234, 23456, hardwareDescription, new TaskExecutorMemoryConfiguration(Long.valueOf(1L), Long.valueOf(2L), Long.valueOf(3L), Long.valueOf(4L), Long.valueOf(5L), Long.valueOf(6L), Long.valueOf(7L), Long.valueOf(8L), Long.valueOf(9L), Long.valueOf(10L)), ResourceProfile.fromResources((double)1.0, (int)1024), ResourceProfile.fromResources((double)1.0, (int)1024).multiply(slotCount), taskExecutorGateway.getAddress());
        RegistrationResponse registrationResult = (RegistrationResponse)resourceManagerGateway.registerTaskExecutor(taskExecutorRegistration, TestingUtils.TIMEOUT).get();
        Assertions.assertThat((Object)registrationResult).isInstanceOf(TaskExecutorRegistrationSuccess.class);
        InstanceID instanceID = ((TaskExecutorRegistrationSuccess)registrationResult).getRegistrationId();
        ArrayList<SlotStatus> slots = new ArrayList<SlotStatus>();
        for (int i = 0; i < slotCount; ++i) {
            slots.add(new SlotStatus(new SlotID(taskManagerId, i), ResourceProfile.fromResources((double)1.0, (int)1024)));
        }
        resourceManagerGateway.sendSlotReport(taskManagerId, instanceID, new SlotReport(slots), Time.seconds((long)5L));
    }

    private JobMasterGateway createJobMasterGateway(Collection<BlockedNode> receivedBlockedNodes) {
        TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().setNotifyNewBlockedNodesFunction(blockedNodes -> {
            receivedBlockedNodes.addAll((Collection<BlockedNode>)blockedNodes);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setAddress(UUID.randomUUID().toString()).build();
        rpcService.registerGateway(jobMasterGateway.getAddress(), (RpcGateway)jobMasterGateway);
        return jobMasterGateway;
    }

    private static void registerJobMasterToResourceManager(ResourceManagerGateway resourceManagerGateway, JobMasterGateway jobMasterGateway, JobID jobId) throws Exception {
        resourceManagerGateway.registerJobMaster((JobMasterId)jobMasterGateway.getFencingToken(), ResourceID.generate(), jobMasterGateway.getAddress(), jobId, TIMEOUT).get();
    }

    private void testDisconnectJobManager(JobStatus jobStatus) throws Exception {
        TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().setAddress(UUID.randomUUID().toString()).build();
        rpcService.registerGateway(jobMasterGateway.getAddress(), (RpcGateway)jobMasterGateway);
        OneShotLatch jobAdded = new OneShotLatch();
        OneShotLatch jobRemoved = new OneShotLatch();
        TestingJobLeaderIdService jobLeaderIdService = TestingJobLeaderIdService.newBuilder().setAddJobConsumer(ignored -> jobAdded.trigger()).setRemoveJobConsumer(ignored -> jobRemoved.trigger()).build();
        this.resourceManager = new ResourceManagerBuilder().withJobLeaderIdService(jobLeaderIdService).buildAndStart();
        this.highAvailabilityServices.setJobMasterLeaderRetrieverFunction(requestedJobId -> new SettableLeaderRetrievalService(jobMasterGateway.getAddress(), jobMasterGateway.getFencingToken().toUUID()));
        JobID jobId = JobID.generate();
        ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)this.resourceManager.getSelfGateway(ResourceManagerGateway.class);
        resourceManagerGateway.registerJobMaster(jobMasterGateway.getFencingToken(), ResourceID.generate(), jobMasterGateway.getAddress(), jobId, TIMEOUT);
        jobAdded.await();
        resourceManagerGateway.disconnectJobManager(jobId, jobStatus, (Exception)((Object)new FlinkException("Test exception")));
        if (jobStatus.isGloballyTerminalState()) {
            jobRemoved.await();
        } else {
            Assertions.assertThatThrownBy(() -> jobRemoved.await(10L, TimeUnit.MILLISECONDS), (String)"We should not have removed the job.", (Object[])new Object[0]).isInstanceOf(TimeoutException.class);
        }
    }

    private void runHeartbeatTimeoutTest(Consumer<ResourceManagerBuilder> prepareResourceManager, org.apache.flink.util.function.ThrowingConsumer<ResourceManagerGateway, Exception> registerComponentAtResourceManager, org.apache.flink.util.function.ThrowingConsumer<ResourceID, Exception> verifyHeartbeatTimeout) throws Exception {
        ResourceManagerBuilder rmBuilder = new ResourceManagerBuilder();
        prepareResourceManager.accept(rmBuilder);
        this.resourceManager = rmBuilder.withHeartbeatServices(ResourceManagerTest.fastHeartbeatServices).buildAndStart();
        ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)this.resourceManager.getSelfGateway(ResourceManagerGateway.class);
        registerComponentAtResourceManager.accept((Object)resourceManagerGateway);
        verifyHeartbeatTimeout.accept((Object)this.resourceManagerResourceId);
    }

    private void runHeartbeatTargetBecomesUnreachableTest(Consumer<ResourceManagerBuilder> prepareResourceManager, org.apache.flink.util.function.ThrowingConsumer<ResourceManagerGateway, Exception> registerComponentAtResourceManager, org.apache.flink.util.function.ThrowingConsumer<ResourceID, Exception> verifyHeartbeatTimeout) throws Exception {
        ResourceManagerBuilder rmBuilder = new ResourceManagerBuilder();
        prepareResourceManager.accept(rmBuilder);
        this.resourceManager = rmBuilder.withHeartbeatServices(ResourceManagerTest.failedRpcEnabledHeartbeatServices).buildAndStart();
        ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)this.resourceManager.getSelfGateway(ResourceManagerGateway.class);
        registerComponentAtResourceManager.accept((Object)resourceManagerGateway);
        verifyHeartbeatTimeout.accept((Object)this.resourceManagerResourceId);
    }

    private class ResourceManagerBuilder {
        private HeartbeatServices heartbeatServices = null;
        private JobLeaderIdService jobLeaderIdService = null;
        private SlotManager slotManager = null;
        private BlocklistHandler.Factory blocklistHandlerFactory = new NoOpBlocklistHandler.Factory();
        private Function<ResourceID, Boolean> stopWorkerFunction = null;
        private CompletableFuture<Void> readyToServeFuture = CompletableFuture.completedFuture(null);

        private ResourceManagerBuilder() {
        }

        private ResourceManagerBuilder withHeartbeatServices(HeartbeatServices heartbeatServices) {
            this.heartbeatServices = heartbeatServices;
            return this;
        }

        private ResourceManagerBuilder withJobLeaderIdService(JobLeaderIdService jobLeaderIdService) {
            this.jobLeaderIdService = jobLeaderIdService;
            return this;
        }

        private ResourceManagerBuilder withSlotManager(SlotManager slotManager) {
            this.slotManager = slotManager;
            return this;
        }

        private ResourceManagerBuilder withBlocklistHandlerFactory(BlocklistHandler.Factory blocklistHandlerFactory) {
            this.blocklistHandlerFactory = blocklistHandlerFactory;
            return this;
        }

        private ResourceManagerBuilder withStopWorkerFunction(Function<ResourceID, Boolean> stopWorkerFunction) {
            this.stopWorkerFunction = stopWorkerFunction;
            return this;
        }

        public ResourceManagerBuilder withReadyToServeFuture(CompletableFuture<Void> readyToServeFuture) {
            this.readyToServeFuture = readyToServeFuture;
            return this;
        }

        private TestingResourceManager buildAndStart() throws Exception {
            if (this.heartbeatServices == null) {
                this.heartbeatServices = heartbeatServices;
            }
            if (this.jobLeaderIdService == null) {
                this.jobLeaderIdService = new DefaultJobLeaderIdService((HighAvailabilityServices)ResourceManagerTest.this.highAvailabilityServices, rpcService.getScheduledExecutor(), TestingUtils.infiniteTime());
            }
            if (this.slotManager == null) {
                this.slotManager = DeclarativeSlotManagerBuilder.newBuilder(rpcService.getScheduledExecutor()).build();
            }
            if (this.stopWorkerFunction == null) {
                this.stopWorkerFunction = ignore -> false;
            }
            ResourceManagerTest.this.resourceManagerId = ResourceManagerId.generate();
            TestingResourceManager resourceManager = new TestingResourceManager(rpcService, ResourceManagerTest.this.resourceManagerId.toUUID(), ResourceManagerTest.this.resourceManagerResourceId, this.heartbeatServices, (DelegationTokenManager)new NoOpDelegationTokenManager(), this.slotManager, NoOpResourceManagerPartitionTracker::get, this.blocklistHandlerFactory, this.jobLeaderIdService, ResourceManagerTest.this.testingFatalErrorHandler, UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup(), this.stopWorkerFunction, this.readyToServeFuture);
            resourceManager.start();
            resourceManager.getStartedFuture().get(TIMEOUT.getSize(), TIMEOUT.getUnit());
            return resourceManager;
        }
    }
}

