/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager.slotmanager;

import java.math.BigDecimal;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.resources.CPUResource;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.SlotManagerMetricGroup;
import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManagerTestBase;
import org.apache.flink.runtime.resourcemanager.slotmanager.PendingTaskManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocationResult;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocationStrategy;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceDeclaration;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerConfigurationBuilder;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotState;
import org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerInfo;
import org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerSlotInformation;
import org.apache.flink.runtime.slots.ResourceRequirements;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.util.function.ThrowingConsumer;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.junit.jupiter.api.Test;

class FineGrainedSlotManagerTest
extends FineGrainedSlotManagerTestBase {
    private static final ResourceProfile LARGE_SLOT_RESOURCE_PROFILE = DEFAULT_TOTAL_RESOURCE_PROFILE.multiply(2);
    private static final ResourceProfile LARGE_TOTAL_RESOURCE_PROFILE = LARGE_SLOT_RESOURCE_PROFILE.multiply(2);

    FineGrainedSlotManagerTest() {
    }

    @Override
    protected Optional<ResourceAllocationStrategy> getResourceAllocationStrategy() {
        return Optional.empty();
    }

    @Test
    void testInitializeAndClose() throws Exception {
        new FineGrainedSlotManagerTestBase.Context(){
            {
                this.runTest(() -> {});
            }
        };
    }

    @Test
    void testTaskManagerRegistration() throws Exception {
        final TaskExecutorConnection taskManagerConnection = FineGrainedSlotManagerTest.createTaskExecutorConnection();
        new FineGrainedSlotManagerTestBase.Context(){
            {
                super(FineGrainedSlotManagerTest.this);
                this.runTest(() -> {
                    CompletableFuture registerTaskManagerFuture = new CompletableFuture();
                    this.runInMainThread(() -> registerTaskManagerFuture.complete(this.getSlotManager().registerTaskManager(taskManagerConnection, new SlotReport(), FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE, FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE)));
                    Assertions.assertThat((Comparable)((Comparable)FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(registerTaskManagerFuture))).isEqualTo((Object)SlotManager.RegistrationResult.SUCCESS);
                    Assertions.assertThat((int)this.getSlotManager().getNumberRegisteredSlots()).isEqualTo(2);
                    Assertions.assertThat((Collection)this.getTaskManagerTracker().getRegisteredTaskManagers()).hasSize(1);
                    Assertions.assertThat((Optional)this.getTaskManagerTracker().getRegisteredTaskManager(taskManagerConnection.getInstanceID())).isPresent();
                    Assertions.assertThat((Object)((TaskManagerInfo)this.getTaskManagerTracker().getRegisteredTaskManager(taskManagerConnection.getInstanceID()).get()).getAvailableResource()).isEqualTo((Object)FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE);
                    Assertions.assertThat((Object)((TaskManagerInfo)this.getTaskManagerTracker().getRegisteredTaskManager(taskManagerConnection.getInstanceID()).get()).getTotalResource()).isEqualTo((Object)FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE);
                });
            }
        };
    }

    @Test
    void testTaskManagerUnregistration() throws Exception {
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(tuple6 -> new CompletableFuture()).createTestingTaskExecutorGateway();
        final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(ResourceID.generate(), (TaskExecutorGateway)taskExecutorGateway);
        final AllocationID allocationId = new AllocationID();
        final SlotReport slotReport = new SlotReport(FineGrainedSlotManagerTest.createAllocatedSlotStatus(allocationId, DEFAULT_SLOT_RESOURCE_PROFILE));
        new FineGrainedSlotManagerTestBase.Context(){
            {
                super(FineGrainedSlotManagerTest.this);
                this.runTest(() -> {
                    CompletableFuture registerTaskManagerFuture = new CompletableFuture();
                    CompletableFuture unRegisterTaskManagerFuture = new CompletableFuture();
                    this.runInMainThread(() -> registerTaskManagerFuture.complete(this.getSlotManager().registerTaskManager(taskManagerConnection, slotReport, FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE, FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE)));
                    Assertions.assertThat((Comparable)((Comparable)FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(registerTaskManagerFuture))).isEqualTo((Object)SlotManager.RegistrationResult.SUCCESS);
                    Assertions.assertThat((Collection)this.getTaskManagerTracker().getRegisteredTaskManagers()).hasSize(1);
                    Optional slot = this.getTaskManagerTracker().getAllocatedOrPendingSlot(allocationId);
                    Assertions.assertThat((Optional)slot).isPresent();
                    Assertions.assertThat((Comparable)((TaskManagerSlotInformation)slot.get()).getState()).isSameAs((Object)SlotState.ALLOCATED);
                    this.runInMainThread(() -> unRegisterTaskManagerFuture.complete(this.getSlotManager().unregisterTaskManager(taskManagerConnection.getInstanceID(), (Exception)((Object)FineGrainedSlotManagerTestBase.TEST_EXCEPTION))));
                    Assertions.assertThat((Boolean)((Boolean)FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(unRegisterTaskManagerFuture))).isTrue();
                    Assertions.assertThat((Collection)this.getTaskManagerTracker().getRegisteredTaskManagers()).isEmpty();
                    Assertions.assertThat((Optional)this.getTaskManagerTracker().getAllocatedOrPendingSlot(allocationId)).isNotPresent();
                });
            }
        };
    }

    @Test
    void testTaskManagerRegistrationDeductPendingTaskManager() throws Exception {
        final TaskExecutorConnection taskExecutionConnection1 = FineGrainedSlotManagerTest.createTaskExecutorConnection();
        final TaskExecutorConnection taskExecutionConnection2 = FineGrainedSlotManagerTest.createTaskExecutorConnection();
        final TaskExecutorConnection taskExecutionConnection3 = FineGrainedSlotManagerTest.createTaskExecutorConnection();
        final SlotReport slotReportWithAllocatedSlot = new SlotReport(FineGrainedSlotManagerTest.createAllocatedSlotStatus(new AllocationID(), DEFAULT_SLOT_RESOURCE_PROFILE));
        new FineGrainedSlotManagerTestBase.Context(){
            {
                super(FineGrainedSlotManagerTest.this);
                this.resourceAllocationStrategyBuilder.setTryFulfillRequirementsFunction((jobRequirements, ignore) -> {
                    Assertions.assertThat((Map)jobRequirements).hasSize(1);
                    JobID jobID = (JobID)jobRequirements.keySet().stream().findFirst().get();
                    ResourceAllocationResult.Builder builder = ResourceAllocationResult.builder();
                    PendingTaskManager pendingTaskManager = new PendingTaskManager(FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE, 2);
                    builder.addPendingTaskManagerAllocate(pendingTaskManager);
                    builder.addAllocationOnPendingResource(jobID, pendingTaskManager.getPendingTaskManagerId(), FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE);
                    return builder.build();
                });
                this.slotManagerConfigurationBuilder.setRequirementCheckDelay(Duration.ZERO);
                this.runTest(() -> {
                    CompletableFuture registerTaskManagerFuture1 = new CompletableFuture();
                    CompletableFuture registerTaskManagerFuture2 = new CompletableFuture();
                    CompletableFuture registerTaskManagerFuture3 = new CompletableFuture();
                    this.runInMainThread(() -> {
                        this.getSlotManager().processResourceRequirements(FineGrainedSlotManagerTestBase.createResourceRequirementsForSingleSlot());
                        registerTaskManagerFuture1.complete(this.getSlotManager().registerTaskManager(taskExecutionConnection1, slotReportWithAllocatedSlot, FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE, FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE));
                    });
                    Assertions.assertThat((Comparable)((Comparable)FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(registerTaskManagerFuture1))).isEqualTo((Object)SlotManager.RegistrationResult.SUCCESS);
                    Assertions.assertThat((Collection)this.getTaskManagerTracker().getPendingTaskManagers()).hasSize(1);
                    this.runInMainThread(() -> registerTaskManagerFuture2.complete(this.getSlotManager().registerTaskManager(taskExecutionConnection2, new SlotReport(), LARGE_TOTAL_RESOURCE_PROFILE, LARGE_SLOT_RESOURCE_PROFILE)));
                    Assertions.assertThat((Comparable)((Comparable)FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(registerTaskManagerFuture2))).isEqualTo((Object)SlotManager.RegistrationResult.SUCCESS);
                    Assertions.assertThat((Collection)this.getTaskManagerTracker().getPendingTaskManagers()).hasSize(1);
                    this.runInMainThread(() -> registerTaskManagerFuture3.complete(this.getSlotManager().registerTaskManager(taskExecutionConnection3, new SlotReport(), FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE, FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE)));
                    Assertions.assertThat((Comparable)((Comparable)FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(registerTaskManagerFuture3))).isEqualTo((Object)SlotManager.RegistrationResult.SUCCESS);
                    Assertions.assertThat((Collection)this.getTaskManagerTracker().getPendingTaskManagers()).isEmpty();
                });
            }
        };
    }

    @Test
    void testReceivingUnknownSlotReport() throws Exception {
        final InstanceID unknownInstanceID = new InstanceID();
        final SlotReport unknownSlotReport = new SlotReport();
        new FineGrainedSlotManagerTestBase.Context(){
            {
                super(FineGrainedSlotManagerTest.this);
                this.runTest(() -> {
                    Assertions.assertThat((int)this.getSlotManager().getNumberRegisteredSlots()).isEqualTo(0);
                    CompletableFuture reportSlotFuture = new CompletableFuture();
                    this.runInMainThread(() -> reportSlotFuture.complete(this.getSlotManager().reportSlotStatus(unknownInstanceID, unknownSlotReport)));
                    Assertions.assertThat((Boolean)((Boolean)FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(reportSlotFuture))).isFalse();
                    Assertions.assertThat((int)this.getSlotManager().getNumberRegisteredSlots()).isEqualTo(0);
                });
            }
        };
    }

    @Test
    void testSlotAllocationAccordingToStrategyResult() throws Exception {
        final CompletableFuture requestSlotFuture = new CompletableFuture();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(tuple6 -> {
            requestSlotFuture.complete(tuple6);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).createTestingTaskExecutorGateway();
        final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(ResourceID.generate(), (TaskExecutorGateway)taskExecutorGateway);
        final JobID jobId = new JobID();
        final SlotReport slotReport = new SlotReport();
        new FineGrainedSlotManagerTestBase.Context(){
            {
                super(FineGrainedSlotManagerTest.this);
                this.resourceAllocationStrategyBuilder.setTryFulfillRequirementsFunction((jobIDCollectionMap, taskManagerResourceInfoProvider) -> ResourceAllocationResult.builder().addAllocationOnRegisteredResource(jobId, taskManagerConnection.getInstanceID(), FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE).build());
                this.runTest(() -> {
                    this.runInMainThread(() -> {
                        this.getSlotManager().registerTaskManager(taskManagerConnection, slotReport, FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE, FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE);
                        this.getSlotManager().processResourceRequirements(FineGrainedSlotManagerTestBase.createResourceRequirements(jobId, 1));
                    });
                    Tuple6 requestSlot = (Tuple6)FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(requestSlotFuture);
                    Assertions.assertThat((Comparable)((Comparable)requestSlot.f1)).isEqualTo((Object)jobId);
                    Assertions.assertThat((Object)requestSlot.f3).isEqualTo((Object)FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE);
                });
            }
        };
    }

    @Test
    void testRequestNewResourcesAccordingToStrategyResult() throws Exception {
        final JobID jobId = new JobID();
        final AtomicInteger requestCount = new AtomicInteger(0);
        final ArrayList allocateResourceFutures = new ArrayList();
        allocateResourceFutures.add(new CompletableFuture());
        allocateResourceFutures.add(new CompletableFuture());
        new FineGrainedSlotManagerTestBase.Context(){
            {
                super(FineGrainedSlotManagerTest.this);
                PendingTaskManager pendingTaskManager = new PendingTaskManager(FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE, 2);
                this.resourceAllocatorBuilder.setDeclareResourceNeededConsumer(resourceDeclarations -> {
                    Assertions.assertThat((int)requestCount.get()).isLessThan(2);
                    if (!resourceDeclarations.isEmpty()) {
                        ((CompletableFuture)allocateResourceFutures.get(requestCount.getAndIncrement())).complete(null);
                    }
                });
                this.resourceAllocationStrategyBuilder.setTryFulfillRequirementsFunction((jobIDCollectionMap, taskManagerResourceInfoProvider) -> ResourceAllocationResult.builder().addPendingTaskManagerAllocate(pendingTaskManager).addAllocationOnPendingResource(jobId, pendingTaskManager.getPendingTaskManagerId(), FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE).build());
                this.runTest(() -> {
                    this.runInMainThread(() -> this.getSlotManager().processResourceRequirements(FineGrainedSlotManagerTestBase.createResourceRequirements(jobId, 1)));
                    FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn((CompletableFuture)allocateResourceFutures.get(0));
                    FineGrainedSlotManagerTestBase.assertFutureNotComplete((CompletableFuture)allocateResourceFutures.get(1));
                    Assertions.assertThat((int)requestCount.get()).isEqualTo(1);
                });
            }
        };
    }

    @Test
    void testSlotAllocationForPendingTaskManagerWillBeRespected() throws Exception {
        final JobID jobId = new JobID();
        final CompletableFuture requestResourceFuture = new CompletableFuture();
        final PendingTaskManager pendingTaskManager = new PendingTaskManager(DEFAULT_TOTAL_RESOURCE_PROFILE, 2);
        final CompletableFuture requestSlotFuture = new CompletableFuture();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(tuple6 -> {
            requestSlotFuture.complete(tuple6);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).createTestingTaskExecutorGateway();
        final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(ResourceID.generate(), (TaskExecutorGateway)taskExecutorGateway);
        new FineGrainedSlotManagerTestBase.Context(){
            {
                super(FineGrainedSlotManagerTest.this);
                this.resourceAllocationStrategyBuilder.setTryFulfillRequirementsFunction((jobIDCollectionMap, taskManagerResourceInfoProvider) -> ResourceAllocationResult.builder().addPendingTaskManagerAllocate(pendingTaskManager).addAllocationOnPendingResource(jobId, pendingTaskManager.getPendingTaskManagerId(), FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE).build());
                this.resourceAllocatorBuilder.setDeclareResourceNeededConsumer(resourceDeclarations -> {
                    if (!resourceDeclarations.isEmpty()) {
                        requestResourceFuture.complete(null);
                    }
                });
                this.runTest(() -> {
                    this.runInMainThread(() -> this.getSlotManager().processResourceRequirements(FineGrainedSlotManagerTestBase.createResourceRequirements(jobId, 1)));
                    FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(requestResourceFuture);
                    this.runInMainThread(() -> this.getSlotManager().registerTaskManager(taskManagerConnection, new SlotReport(), FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE, FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE));
                    Tuple6 requestSlot = (Tuple6)FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(requestSlotFuture);
                    Assertions.assertThat((Comparable)((Comparable)requestSlot.f1)).isEqualTo((Object)jobId);
                    Assertions.assertThat((Object)requestSlot.f3).isEqualTo((Object)FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE);
                });
            }
        };
    }

    @Test
    void testNotificationAboutNotEnoughResources() throws Exception {
        this.testNotificationAboutNotEnoughResources(false);
    }

    @Test
    void testGracePeriodForNotificationAboutNotEnoughResources() throws Exception {
        this.testNotificationAboutNotEnoughResources(true);
    }

    private void testNotificationAboutNotEnoughResources(final boolean withNotificationGracePeriod) throws Exception {
        final JobID jobId = new JobID();
        final ArrayList notEnoughResourceNotifications = new ArrayList();
        final CompletableFuture notifyNotEnoughResourceFuture = new CompletableFuture();
        new FineGrainedSlotManagerTestBase.Context(){
            {
                super(FineGrainedSlotManagerTest.this);
                this.resourceEventListenerBuilder.setNotEnoughResourceAvailableConsumer((jobId1, acquiredResources) -> {
                    notEnoughResourceNotifications.add(Tuple2.of((Object)jobId1, (Object)acquiredResources));
                    notifyNotEnoughResourceFuture.complete(null);
                });
                this.resourceAllocationStrategyBuilder.setTryFulfillRequirementsFunction((jobIDCollectionMap, taskManagerResourceInfoProvider) -> ResourceAllocationResult.builder().addUnfulfillableJob(jobId).build());
                this.runTest(() -> {
                    if (withNotificationGracePeriod) {
                        this.runInMainThread(() -> this.getSlotManager().setFailUnfulfillableRequest(false));
                    }
                    ResourceRequirements resourceRequirements = FineGrainedSlotManagerTestBase.createResourceRequirements(jobId, 1);
                    this.runInMainThread(() -> this.getSlotManager().processResourceRequirements(resourceRequirements));
                    if (withNotificationGracePeriod) {
                        FineGrainedSlotManagerTestBase.assertFutureNotComplete(notifyNotEnoughResourceFuture);
                        Assertions.assertThat((List)notEnoughResourceNotifications).isEmpty();
                        this.runInMainThread(() -> this.getSlotManager().setFailUnfulfillableRequest(true));
                    }
                    FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(notifyNotEnoughResourceFuture);
                    Assertions.assertThat((List)notEnoughResourceNotifications).hasSize(1);
                    Tuple2 notification = (Tuple2)notEnoughResourceNotifications.get(0);
                    Assertions.assertThat((Comparable)((Comparable)notification.f0)).isEqualTo((Object)jobId);
                });
            }
        };
    }

    @Test
    void testRequirementCheckOnlyTriggeredOnce() throws Exception {
        new FineGrainedSlotManagerTestBase.Context(){
            {
                ArrayList checkRequirementFutures = new ArrayList();
                checkRequirementFutures.add(new CompletableFuture());
                checkRequirementFutures.add(new CompletableFuture());
                Duration requirementCheckDelay = Duration.ofMillis(50L);
                this.resourceAllocationStrategyBuilder.setTryFulfillRequirementsFunction((ignored1, ignored2) -> {
                    if (((CompletableFuture)checkRequirementFutures.get(0)).isDone()) {
                        ((CompletableFuture)checkRequirementFutures.get(1)).complete(null);
                    } else {
                        ((CompletableFuture)checkRequirementFutures.get(0)).complete(null);
                    }
                    return ResourceAllocationResult.builder().build();
                });
                this.slotManagerConfigurationBuilder.setRequirementCheckDelay(requirementCheckDelay);
                this.runTest(() -> {
                    ResourceRequirements resourceRequirements1 = FineGrainedSlotManagerTestBase.createResourceRequirementsForSingleSlot();
                    ResourceRequirements resourceRequirements2 = FineGrainedSlotManagerTestBase.createResourceRequirementsForSingleSlot();
                    ResourceRequirements resourceRequirements3 = FineGrainedSlotManagerTestBase.createResourceRequirementsForSingleSlot();
                    TaskExecutorConnection taskExecutionConnection = FineGrainedSlotManagerTestBase.createTaskExecutorConnection();
                    CompletableFuture registrationFuture = new CompletableFuture();
                    long start = System.nanoTime();
                    this.runInMainThread(() -> {
                        this.getSlotManager().processResourceRequirements(resourceRequirements1);
                        this.getSlotManager().processResourceRequirements(resourceRequirements2);
                        this.getSlotManager().registerTaskManager(taskExecutionConnection, new SlotReport(), FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE, FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE);
                        registrationFuture.complete(null);
                    });
                    FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(registrationFuture);
                    long registrationTime = (System.nanoTime() - start) / 1000000L;
                    ((AbstractBooleanAssert)Assumptions.assumeThat((registrationTime < requirementCheckDelay.toMillis() ? 1 : 0) != 0).as("The time of process requirement and register task manager must not take longer than the requirement check delay. If it does, then this indicates a very slow machine.", new Object[0])).isTrue();
                    FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn((CompletableFuture)checkRequirementFutures.get(0));
                    FineGrainedSlotManagerTestBase.assertFutureNotComplete((CompletableFuture)checkRequirementFutures.get(1));
                    Thread.sleep(requirementCheckDelay.toMillis() * 2L);
                    FineGrainedSlotManagerTestBase.assertFutureNotComplete((CompletableFuture)checkRequirementFutures.get(1));
                    this.runInMainThread(() -> this.getSlotManager().processResourceRequirements(resourceRequirements3));
                    FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn((CompletableFuture)checkRequirementFutures.get(1));
                });
            }
        };
    }

    @Test
    void testTimeoutForUnusedTaskManager() throws Exception {
        final Time taskManagerTimeout = Time.milliseconds((long)50L);
        final CompletableFuture releaseResourceFuture = new CompletableFuture();
        final AllocationID allocationId = new AllocationID();
        final TaskExecutorConnection taskExecutionConnection = FineGrainedSlotManagerTest.createTaskExecutorConnection();
        final InstanceID instanceId = taskExecutionConnection.getInstanceID();
        new FineGrainedSlotManagerTestBase.Context(){
            {
                super(FineGrainedSlotManagerTest.this);
                this.resourceAllocatorBuilder.setDeclareResourceNeededConsumer(resourceDeclarations -> {
                    Assertions.assertThat((int)resourceDeclarations.size()).isEqualTo(1);
                    ResourceDeclaration resourceDeclaration = (ResourceDeclaration)resourceDeclarations.iterator().next();
                    Assertions.assertThat((int)resourceDeclaration.getNumNeeded()).isEqualTo(0);
                    Assertions.assertThat((int)resourceDeclaration.getUnwantedWorkers().size()).isEqualTo(1);
                    releaseResourceFuture.complete(resourceDeclaration.getUnwantedWorkers().iterator().next());
                });
                this.slotManagerConfigurationBuilder.setTaskManagerTimeout(taskManagerTimeout);
                this.runTest(() -> {
                    CompletableFuture registerTaskManagerFuture = new CompletableFuture();
                    this.runInMainThread(() -> registerTaskManagerFuture.complete(this.getSlotManager().registerTaskManager(taskExecutionConnection, new SlotReport(FineGrainedSlotManagerTestBase.createAllocatedSlotStatus(allocationId, FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE)), FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE, FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE)));
                    Assertions.assertThat((Comparable)((Comparable)FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(registerTaskManagerFuture))).isEqualTo((Object)SlotManager.RegistrationResult.SUCCESS);
                    Assertions.assertThat((long)this.getSlotManager().getTaskManagerIdleSince(instanceId)).isEqualTo(Long.MAX_VALUE);
                    CompletableFuture idleSinceFuture = new CompletableFuture();
                    this.runInMainThread(() -> {
                        this.getSlotManager().freeSlot(new SlotID(taskExecutionConnection.getResourceID(), 0), allocationId);
                        idleSinceFuture.complete(this.getSlotManager().getTaskManagerIdleSince(instanceId));
                    });
                    Assertions.assertThat((Long)((Long)FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(idleSinceFuture))).isNotEqualTo(Long.MAX_VALUE);
                    Assertions.assertThat((Comparable)((Comparable)FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(releaseResourceFuture))).isEqualTo((Object)instanceId);
                    Assertions.assertThat((int)this.getSlotManager().getNumberRegisteredSlots()).isEqualTo(2);
                    CompletableFuture unregisterTaskManagerFuture = new CompletableFuture();
                    this.runInMainThread(() -> unregisterTaskManagerFuture.complete(this.getSlotManager().unregisterTaskManager(taskExecutionConnection.getInstanceID(), (Exception)((Object)FineGrainedSlotManagerTestBase.TEST_EXCEPTION))));
                    Assertions.assertThat((Boolean)((Boolean)FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(unregisterTaskManagerFuture))).isTrue();
                    Assertions.assertThat((int)this.getSlotManager().getNumberRegisteredSlots()).isEqualTo(0);
                });
            }
        };
    }

    @Test
    void testMaxTotalResourceCpuExceeded() throws Exception {
        Consumer<SlotManagerConfigurationBuilder> maxTotalResourceSetter = smConfigBuilder -> smConfigBuilder.setMaxTotalCpu((CPUResource)DEFAULT_TOTAL_RESOURCE_PROFILE.getCpuCores().multiply(BigDecimal.valueOf(1.5)));
        this.testMaxTotalResourceExceededAllocateResource(maxTotalResourceSetter);
        this.testMaxTotalResourceExceededRegisterResource(maxTotalResourceSetter);
    }

    @Test
    void testGetResourceOverview() throws Exception {
        final TaskExecutorConnection taskExecutorConnection1 = FineGrainedSlotManagerTest.createTaskExecutorConnection();
        final TaskExecutorConnection taskExecutorConnection2 = FineGrainedSlotManagerTest.createTaskExecutorConnection();
        ResourceID resourceId1 = ResourceID.generate();
        ResourceID resourceId2 = ResourceID.generate();
        SlotID slotId1 = new SlotID(resourceId1, 0);
        SlotID slotId2 = new SlotID(resourceId2, 0);
        final ResourceProfile resourceProfile1 = ResourceProfile.fromResources((double)1.0, (int)10);
        final ResourceProfile resourceProfile2 = ResourceProfile.fromResources((double)2.0, (int)20);
        SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile1, new JobID(), new AllocationID());
        SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile2, new JobID(), new AllocationID());
        final SlotReport slotReport1 = new SlotReport(slotStatus1);
        final SlotReport slotReport2 = new SlotReport(slotStatus2);
        new FineGrainedSlotManagerTestBase.Context(){
            {
                super(FineGrainedSlotManagerTest.this);
                this.runTest(() -> {
                    CompletableFuture registerTaskManagerFuture1 = new CompletableFuture();
                    CompletableFuture registerTaskManagerFuture2 = new CompletableFuture();
                    this.runInMainThread(() -> {
                        registerTaskManagerFuture1.complete(this.getSlotManager().registerTaskManager(taskExecutorConnection1, slotReport1, resourceProfile1.multiply(2), resourceProfile1));
                        registerTaskManagerFuture2.complete(this.getSlotManager().registerTaskManager(taskExecutorConnection2, slotReport2, resourceProfile2.multiply(2), resourceProfile2));
                    });
                    Assertions.assertThat((Comparable)((Comparable)FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(registerTaskManagerFuture1))).isEqualTo((Object)SlotManager.RegistrationResult.SUCCESS);
                    Assertions.assertThat((Comparable)((Comparable)FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(registerTaskManagerFuture2))).isEqualTo((Object)SlotManager.RegistrationResult.SUCCESS);
                    Assertions.assertThat((Object)this.getSlotManager().getFreeResource()).isEqualTo((Object)resourceProfile1.merge(resourceProfile2));
                    Assertions.assertThat((Object)this.getSlotManager().getFreeResourceOf(taskExecutorConnection1.getInstanceID())).isEqualTo((Object)resourceProfile1);
                    Assertions.assertThat((Object)this.getSlotManager().getFreeResourceOf(taskExecutorConnection2.getInstanceID())).isEqualTo((Object)resourceProfile2);
                    Assertions.assertThat((Object)this.getSlotManager().getRegisteredResource()).isEqualTo((Object)resourceProfile1.merge(resourceProfile2).multiply(2));
                    Assertions.assertThat((Object)this.getSlotManager().getRegisteredResourceOf(taskExecutorConnection1.getInstanceID())).isEqualTo((Object)resourceProfile1.multiply(2));
                    Assertions.assertThat((Object)this.getSlotManager().getRegisteredResourceOf(taskExecutorConnection2.getInstanceID())).isEqualTo((Object)resourceProfile2.multiply(2));
                });
            }
        };
    }

    @Test
    void testMaxTotalResourceMemoryExceeded() throws Exception {
        Consumer<SlotManagerConfigurationBuilder> maxTotalResourceSetter = smConfigBuilder -> smConfigBuilder.setMaxTotalMem(DEFAULT_TOTAL_RESOURCE_PROFILE.getTotalMemory().multiply(1.5));
        this.testMaxTotalResourceExceededAllocateResource(maxTotalResourceSetter);
        this.testMaxTotalResourceExceededRegisterResource(maxTotalResourceSetter);
    }

    private void testMaxTotalResourceExceededAllocateResource(final Consumer<SlotManagerConfigurationBuilder> maxTotalResourceSetter) throws Exception {
        final JobID jobId = new JobID();
        final AtomicInteger requestCount = new AtomicInteger(0);
        final ArrayList allocateResourceFutures = new ArrayList();
        allocateResourceFutures.add(new CompletableFuture());
        allocateResourceFutures.add(new CompletableFuture());
        final PendingTaskManager pendingTaskManager1 = new PendingTaskManager(DEFAULT_TOTAL_RESOURCE_PROFILE, 2);
        final PendingTaskManager pendingTaskManager2 = new PendingTaskManager(DEFAULT_TOTAL_RESOURCE_PROFILE, 2);
        new FineGrainedSlotManagerTestBase.Context(){
            {
                super(FineGrainedSlotManagerTest.this);
                maxTotalResourceSetter.accept(this.slotManagerConfigurationBuilder);
                this.resourceAllocatorBuilder.setDeclareResourceNeededConsumer(resourceDeclarations -> {
                    if (!resourceDeclarations.isEmpty()) {
                        Assertions.assertThat((int)requestCount.get()).isLessThan(2);
                        ((CompletableFuture)allocateResourceFutures.get(requestCount.getAndIncrement())).complete(null);
                    }
                });
                this.resourceAllocationStrategyBuilder.setTryFulfillRequirementsFunction((jobIDCollectionMap, taskManagerResourceInfoProvider) -> ResourceAllocationResult.builder().addPendingTaskManagerAllocate(pendingTaskManager1).addPendingTaskManagerAllocate(pendingTaskManager2).addAllocationOnPendingResource(jobId, pendingTaskManager1.getPendingTaskManagerId(), FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE).addAllocationOnPendingResource(jobId, pendingTaskManager2.getPendingTaskManagerId(), FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE).build());
                this.runTest(() -> {
                    this.runInMainThread(() -> this.getSlotManager().processResourceRequirements(FineGrainedSlotManagerTestBase.createResourceRequirements(jobId, 2)));
                    FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn((CompletableFuture)allocateResourceFutures.get(0));
                    FineGrainedSlotManagerTestBase.assertFutureNotComplete((CompletableFuture)allocateResourceFutures.get(1));
                });
            }
        };
    }

    private void testMaxTotalResourceExceededRegisterResource(final Consumer<SlotManagerConfigurationBuilder> maxTotalResourceSetter) throws Exception {
        final TaskExecutorConnection taskManagerConnection1 = FineGrainedSlotManagerTest.createTaskExecutorConnection();
        final TaskExecutorConnection taskManagerConnection2 = FineGrainedSlotManagerTest.createTaskExecutorConnection();
        final CompletableFuture registerTaskManagerFuture1 = new CompletableFuture();
        final CompletableFuture registerTaskManagerFuture2 = new CompletableFuture();
        new FineGrainedSlotManagerTestBase.Context(){
            {
                super(FineGrainedSlotManagerTest.this);
                maxTotalResourceSetter.accept(this.slotManagerConfigurationBuilder);
                this.runTest(() -> {
                    this.runInMainThread(() -> registerTaskManagerFuture1.complete(this.getSlotManager().registerTaskManager(taskManagerConnection1, new SlotReport(), FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE, FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE)));
                    Assertions.assertThat((Comparable)((Comparable)FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(registerTaskManagerFuture1))).isEqualTo((Object)SlotManager.RegistrationResult.SUCCESS);
                    Assertions.assertThat((Collection)this.getTaskManagerTracker().getRegisteredTaskManagers()).hasSize(1);
                    Assertions.assertThat((Optional)this.getTaskManagerTracker().getRegisteredTaskManager(taskManagerConnection1.getInstanceID())).isPresent();
                    this.runInMainThread(() -> registerTaskManagerFuture2.complete(this.getSlotManager().registerTaskManager(taskManagerConnection2, new SlotReport(), FineGrainedSlotManagerTestBase.DEFAULT_TOTAL_RESOURCE_PROFILE, FineGrainedSlotManagerTestBase.DEFAULT_SLOT_RESOURCE_PROFILE)));
                    Assertions.assertThat((Comparable)((Comparable)FineGrainedSlotManagerTestBase.assertFutureCompleteAndReturn(registerTaskManagerFuture2))).isEqualTo((Object)SlotManager.RegistrationResult.REJECTED);
                    Assertions.assertThat((Collection)this.getTaskManagerTracker().getRegisteredTaskManagers()).hasSize(1);
                    Assertions.assertThat((Optional)this.getTaskManagerTracker().getRegisteredTaskManager(taskManagerConnection2.getInstanceID())).isNotPresent();
                });
            }
        };
    }

    @Test
    void testMetricsUnregisteredWhenSuspending() throws Exception {
        this.testAccessMetricValueDuringItsUnregister((ThrowingConsumer<SlotManager, Exception>)((ThrowingConsumer)SlotManager::suspend));
    }

    @Test
    void testMetricsUnregisteredWhenClosing() throws Exception {
        this.testAccessMetricValueDuringItsUnregister((ThrowingConsumer<SlotManager, Exception>)((ThrowingConsumer)AutoCloseable::close));
    }

    private void testAccessMetricValueDuringItsUnregister(ThrowingConsumer<SlotManager, Exception> closeFn) throws Exception {
        AtomicInteger registeredMetrics = new AtomicInteger();
        TestingMetricRegistry metricRegistry = TestingMetricRegistry.builder().setRegisterConsumer((a, b, c) -> registeredMetrics.incrementAndGet()).setUnregisterConsumer((a, b, c) -> registeredMetrics.decrementAndGet()).build();
        FineGrainedSlotManagerTestBase.Context context = new FineGrainedSlotManagerTestBase.Context(this);
        context.setSlotManagerMetricGroup(SlotManagerMetricGroup.create((MetricRegistry)metricRegistry, (String)"localhost"));
        context.runTest(() -> {
            Assertions.assertThat((int)registeredMetrics.get()).isGreaterThan(0);
            context.runInMainThreadAndWait(() -> Assertions.assertThatNoException().isThrownBy(() -> closeFn.accept((Object)context.getSlotManager())));
            Assertions.assertThat((int)registeredMetrics.get()).isEqualTo(0);
        });
    }
}

