/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager.slotmanager;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.resources.CPUResource;
import org.apache.flink.api.common.resources.ExternalResource;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultResourceAllocationStrategy;
import org.apache.flink.runtime.resourcemanager.slotmanager.PendingTaskManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.PendingTaskManagerId;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocationResult;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceReconcileResult;
import org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerInfo;
import org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerResourceInfoProvider;
import org.apache.flink.runtime.resourcemanager.slotmanager.TestingTaskManagerInfo;
import org.apache.flink.runtime.resourcemanager.slotmanager.TestingTaskManagerResourceInfoProvider;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.util.ResourceCounter;
import org.assertj.core.api.AbstractComparableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;

class DefaultResourceAllocationStrategyTest {
    private static final ResourceProfile DEFAULT_SLOT_RESOURCE = ResourceProfile.fromResources((double)1.0, (int)100);
    private static final int NUM_OF_SLOTS = 5;
    private static final DefaultResourceAllocationStrategy ANY_MATCHING_STRATEGY = DefaultResourceAllocationStrategyTest.createStrategy(TaskManagerOptions.TaskManagerLoadBalanceMode.NONE);
    private static final DefaultResourceAllocationStrategy EVENLY_STRATEGY = DefaultResourceAllocationStrategyTest.createStrategy(TaskManagerOptions.TaskManagerLoadBalanceMode.SLOTS);

    DefaultResourceAllocationStrategyTest() {
    }

    @Test
    void testFulfillRequirementWithRegisteredResources() {
        TestingTaskManagerInfo taskManager = new TestingTaskManagerInfo(DEFAULT_SLOT_RESOURCE.multiply(10), DEFAULT_SLOT_RESOURCE.multiply(10), DEFAULT_SLOT_RESOURCE);
        JobID jobId = new JobID();
        ArrayList<ResourceRequirement> requirements = new ArrayList<ResourceRequirement>();
        ResourceProfile largeResource = DEFAULT_SLOT_RESOURCE.multiply(8);
        TestingTaskManagerResourceInfoProvider taskManagerResourceInfoProvider = TestingTaskManagerResourceInfoProvider.newBuilder().setRegisteredTaskManagersSupplier(() -> Collections.singleton(taskManager)).build();
        requirements.add(ResourceRequirement.create((ResourceProfile)largeResource, (int)1));
        requirements.add(ResourceRequirement.create((ResourceProfile)ResourceProfile.UNKNOWN, (int)2));
        ResourceAllocationResult result = ANY_MATCHING_STRATEGY.tryFulfillRequirements(Collections.singletonMap(jobId, requirements), (TaskManagerResourceInfoProvider)taskManagerResourceInfoProvider, resourceID -> false);
        Assertions.assertThat((Collection)result.getUnfulfillableJobs()).isEmpty();
        Assertions.assertThat((Map)result.getAllocationsOnPendingResources()).isEmpty();
        Assertions.assertThat((List)result.getPendingTaskManagersToAllocate()).isEmpty();
        Assertions.assertThat((int)((ResourceCounter)((Map)result.getAllocationsOnRegisteredResources().get(jobId)).get(taskManager.getInstanceId())).getResourceCount(DEFAULT_SLOT_RESOURCE)).isEqualTo(2);
        Assertions.assertThat((int)((ResourceCounter)((Map)result.getAllocationsOnRegisteredResources().get(jobId)).get(taskManager.getInstanceId())).getResourceCount(largeResource)).isEqualTo(1);
    }

    @Test
    void testFulfillRequirementWithRegisteredResourcesEvenly() {
        TestingTaskManagerInfo taskManager1 = new TestingTaskManagerInfo(DEFAULT_SLOT_RESOURCE.multiply(10), DEFAULT_SLOT_RESOURCE.multiply(10), DEFAULT_SLOT_RESOURCE);
        TestingTaskManagerInfo taskManager2 = new TestingTaskManagerInfo(DEFAULT_SLOT_RESOURCE.multiply(10), DEFAULT_SLOT_RESOURCE.multiply(10), DEFAULT_SLOT_RESOURCE);
        TestingTaskManagerInfo taskManager3 = new TestingTaskManagerInfo(DEFAULT_SLOT_RESOURCE.multiply(10), DEFAULT_SLOT_RESOURCE.multiply(10), DEFAULT_SLOT_RESOURCE);
        JobID jobId = new JobID();
        ArrayList<ResourceRequirement> requirements = new ArrayList<ResourceRequirement>();
        ResourceProfile largeResource = DEFAULT_SLOT_RESOURCE.multiply(5);
        TestingTaskManagerResourceInfoProvider taskManagerResourceInfoProvider = TestingTaskManagerResourceInfoProvider.newBuilder().setRegisteredTaskManagersSupplier(() -> Arrays.asList(taskManager1, taskManager2, taskManager3)).build();
        requirements.add(ResourceRequirement.create((ResourceProfile)largeResource, (int)4));
        requirements.add(ResourceRequirement.create((ResourceProfile)ResourceProfile.UNKNOWN, (int)2));
        ResourceAllocationResult result = EVENLY_STRATEGY.tryFulfillRequirements(Collections.singletonMap(jobId, requirements), (TaskManagerResourceInfoProvider)taskManagerResourceInfoProvider, resourceID -> false);
        Assertions.assertThat((Collection)result.getUnfulfillableJobs()).isEmpty();
        Assertions.assertThat((Map)result.getAllocationsOnPendingResources()).isEmpty();
        Assertions.assertThat((List)result.getPendingTaskManagersToAllocate()).isEmpty();
        Assertions.assertThat(((Map)result.getAllocationsOnRegisteredResources().get(jobId)).values()).allSatisfy(resourceCounter -> Assertions.assertThat((int)resourceCounter.getTotalResourceCount()).isEqualTo(2));
        Assertions.assertThat(((Map)result.getAllocationsOnRegisteredResources().get(jobId)).values()).allSatisfy(resourceCounter -> Assertions.assertThat((boolean)resourceCounter.containsResource(largeResource)).isTrue());
    }

    @Test
    void testExcessPendingResourcesCouldReleaseEvenly() {
        JobID jobId = new JobID();
        ArrayList<ResourceRequirement> requirements = new ArrayList<ResourceRequirement>();
        TestingTaskManagerResourceInfoProvider taskManagerResourceInfoProvider = TestingTaskManagerResourceInfoProvider.newBuilder().setPendingTaskManagersSupplier(() -> Arrays.asList(new PendingTaskManager(DEFAULT_SLOT_RESOURCE.multiply(2), 2), new PendingTaskManager(DEFAULT_SLOT_RESOURCE.multiply(2), 2))).build();
        requirements.add(ResourceRequirement.create((ResourceProfile)ResourceProfile.UNKNOWN, (int)2));
        ResourceAllocationResult result = EVENLY_STRATEGY.tryFulfillRequirements(Collections.singletonMap(jobId, requirements), (TaskManagerResourceInfoProvider)taskManagerResourceInfoProvider, resourceID -> false);
        Assertions.assertThat((Collection)result.getUnfulfillableJobs()).isEmpty();
        Assertions.assertThat((List)result.getPendingTaskManagersToAllocate()).isEmpty();
        Assertions.assertThat((Map)result.getAllocationsOnPendingResources()).hasSize(1);
    }

    @Test
    void testSpecialResourcesRequirementCouldFulfilledEvenly() {
        this.testSpecialResourcesRequirementCouldFulfilled(EVENLY_STRATEGY);
    }

    @Test
    void testSpecialResourcesRequirementCouldFulfilledAnyMatching() {
        this.testSpecialResourcesRequirementCouldFulfilled(ANY_MATCHING_STRATEGY);
    }

    void testSpecialResourcesRequirementCouldFulfilled(DefaultResourceAllocationStrategy strategy) {
        ResourceProfile extendedResourceProfile = ResourceProfile.newBuilder((ResourceProfile)DEFAULT_SLOT_RESOURCE).setExtendedResource(new ExternalResource("customResource", 1.0)).build();
        TestingTaskManagerInfo extendedTaskManager = new TestingTaskManagerInfo(extendedResourceProfile.multiply(2), extendedResourceProfile.multiply(1), extendedResourceProfile);
        TestingTaskManagerInfo defaultTaskManager = new TestingTaskManagerInfo(DEFAULT_SLOT_RESOURCE.multiply(2), DEFAULT_SLOT_RESOURCE.multiply(2), DEFAULT_SLOT_RESOURCE);
        JobID jobId = new JobID();
        ArrayList<ResourceRequirement> requirements = new ArrayList<ResourceRequirement>();
        TestingTaskManagerResourceInfoProvider taskManagerResourceInfoProvider = TestingTaskManagerResourceInfoProvider.newBuilder().setRegisteredTaskManagersSupplier(() -> Arrays.asList(defaultTaskManager, extendedTaskManager)).build();
        requirements.add(ResourceRequirement.create((ResourceProfile)extendedResourceProfile, (int)1));
        ResourceAllocationResult result = strategy.tryFulfillRequirements(Collections.singletonMap(jobId, requirements), (TaskManagerResourceInfoProvider)taskManagerResourceInfoProvider, resourceID -> false);
        Assertions.assertThat((Collection)result.getUnfulfillableJobs()).isEmpty();
        Assertions.assertThat((List)result.getPendingTaskManagersToAllocate()).isEmpty();
        Assertions.assertThat((Map)result.getAllocationsOnRegisteredResources()).hasSize(1);
        Assertions.assertThat(((Map)result.getAllocationsOnRegisteredResources().get(jobId)).keySet()).satisfiesExactly(new ThrowingConsumer[]{instanceId -> {
            AbstractComparableAssert cfr_ignored_0 = (AbstractComparableAssert)Assertions.assertThat((Comparable)instanceId).isEqualTo((Object)extendedTaskManager.getInstanceId());
        }});
    }

    @Test
    void testFulfillRequirementWithPendingResources() {
        JobID jobId = new JobID();
        ArrayList<ResourceRequirement> requirements = new ArrayList<ResourceRequirement>();
        ResourceProfile largeResource = DEFAULT_SLOT_RESOURCE.multiply(3);
        PendingTaskManager pendingTaskManager = new PendingTaskManager(DEFAULT_SLOT_RESOURCE.multiply(5), 5);
        TestingTaskManagerResourceInfoProvider taskManagerResourceInfoProvider = TestingTaskManagerResourceInfoProvider.newBuilder().setPendingTaskManagersSupplier(() -> Collections.singleton(pendingTaskManager)).build();
        requirements.add(ResourceRequirement.create((ResourceProfile)largeResource, (int)2));
        requirements.add(ResourceRequirement.create((ResourceProfile)ResourceProfile.UNKNOWN, (int)4));
        ResourceAllocationResult result = ANY_MATCHING_STRATEGY.tryFulfillRequirements(Collections.singletonMap(jobId, requirements), (TaskManagerResourceInfoProvider)taskManagerResourceInfoProvider, resourceID -> false);
        Assertions.assertThat((Collection)result.getUnfulfillableJobs()).isEmpty();
        Assertions.assertThat((Map)result.getAllocationsOnRegisteredResources()).isEmpty();
        Assertions.assertThat((List)result.getPendingTaskManagersToAllocate()).hasSize(1);
        PendingTaskManagerId newAllocated = ((PendingTaskManager)result.getPendingTaskManagersToAllocate().get(0)).getPendingTaskManagerId();
        ResourceCounter allFulfilledRequirements = ResourceCounter.empty();
        for (Map.Entry resourceWithCount : ((ResourceCounter)((Map)result.getAllocationsOnPendingResources().get(pendingTaskManager.getPendingTaskManagerId())).get(jobId)).getResourcesWithCount()) {
            allFulfilledRequirements = allFulfilledRequirements.add((ResourceProfile)resourceWithCount.getKey(), ((Integer)resourceWithCount.getValue()).intValue());
        }
        for (Map.Entry resourceWithCount : ((ResourceCounter)((Map)result.getAllocationsOnPendingResources().get(newAllocated)).get(jobId)).getResourcesWithCount()) {
            allFulfilledRequirements = allFulfilledRequirements.add((ResourceProfile)resourceWithCount.getKey(), ((Integer)resourceWithCount.getValue()).intValue());
        }
        Assertions.assertThat((int)allFulfilledRequirements.getResourceCount(DEFAULT_SLOT_RESOURCE)).isEqualTo(4);
        Assertions.assertThat((int)allFulfilledRequirements.getResourceCount(largeResource)).isEqualTo(2);
    }

    @Test
    void testUnfulfillableRequirement() {
        TestingTaskManagerInfo taskManager = new TestingTaskManagerInfo(DEFAULT_SLOT_RESOURCE.multiply(5), DEFAULT_SLOT_RESOURCE.multiply(5), DEFAULT_SLOT_RESOURCE);
        JobID jobId = new JobID();
        ArrayList<ResourceRequirement> requirements = new ArrayList<ResourceRequirement>();
        ResourceProfile unfulfillableResource = DEFAULT_SLOT_RESOURCE.multiply(8);
        TestingTaskManagerResourceInfoProvider taskManagerResourceInfoProvider = TestingTaskManagerResourceInfoProvider.newBuilder().setRegisteredTaskManagersSupplier(() -> Collections.singleton(taskManager)).build();
        requirements.add(ResourceRequirement.create((ResourceProfile)unfulfillableResource, (int)1));
        ResourceAllocationResult result = ANY_MATCHING_STRATEGY.tryFulfillRequirements(Collections.singletonMap(jobId, requirements), (TaskManagerResourceInfoProvider)taskManagerResourceInfoProvider, resourceID -> false);
        Assertions.assertThat((Collection)result.getUnfulfillableJobs()).containsExactly((Object[])new JobID[]{jobId});
        Assertions.assertThat((List)result.getPendingTaskManagersToAllocate()).isEmpty();
    }

    @Test
    void testBlockedTaskManagerCannotFulfillRequirements() {
        TestingTaskManagerInfo registeredTaskManager = new TestingTaskManagerInfo(DEFAULT_SLOT_RESOURCE.multiply(5), DEFAULT_SLOT_RESOURCE.multiply(5), DEFAULT_SLOT_RESOURCE);
        JobID jobId = new JobID();
        ArrayList<ResourceRequirement> requirements = new ArrayList<ResourceRequirement>();
        TestingTaskManagerResourceInfoProvider taskManagerResourceInfoProvider = TestingTaskManagerResourceInfoProvider.newBuilder().setRegisteredTaskManagersSupplier(() -> Collections.singleton(registeredTaskManager)).build();
        requirements.add(ResourceRequirement.create((ResourceProfile)ResourceProfile.UNKNOWN, (int)10));
        ResourceAllocationResult result = ANY_MATCHING_STRATEGY.tryFulfillRequirements(Collections.singletonMap(jobId, requirements), (TaskManagerResourceInfoProvider)taskManagerResourceInfoProvider, arg_0 -> ((ResourceID)registeredTaskManager.getTaskExecutorConnection().getResourceID()).equals(arg_0));
        Assertions.assertThat((Collection)result.getUnfulfillableJobs()).isEmpty();
        Assertions.assertThat((Map)result.getAllocationsOnRegisteredResources()).isEmpty();
        Assertions.assertThat((List)result.getPendingTaskManagersToAllocate()).hasSize(2);
    }

    @Test
    void testIdleTaskManagerShouldBeReleased() {
        TestingTaskManagerInfo registeredTaskManager = new TestingTaskManagerInfo(DEFAULT_SLOT_RESOURCE.multiply(5), DEFAULT_SLOT_RESOURCE.multiply(5), DEFAULT_SLOT_RESOURCE);
        TestingTaskManagerResourceInfoProvider taskManagerResourceInfoProvider = TestingTaskManagerResourceInfoProvider.newBuilder().setRegisteredTaskManagersSupplier(() -> Collections.singleton(registeredTaskManager)).build();
        ResourceReconcileResult result = ANY_MATCHING_STRATEGY.tryReconcileClusterResources((TaskManagerResourceInfoProvider)taskManagerResourceInfoProvider);
        Assertions.assertThat((List)result.getTaskManagersToRelease()).isEmpty();
        registeredTaskManager.setIdleSince(System.currentTimeMillis() - 10L);
        result = ANY_MATCHING_STRATEGY.tryReconcileClusterResources((TaskManagerResourceInfoProvider)taskManagerResourceInfoProvider);
        Assertions.assertThat((List)result.getTaskManagersToRelease()).containsExactly((Object[])new TaskManagerInfo[]{registeredTaskManager});
    }

    @Test
    void testIdlePendingTaskManagerShouldBeReleased() {
        PendingTaskManager pendingTaskManager = new PendingTaskManager(DEFAULT_SLOT_RESOURCE, 1);
        TestingTaskManagerResourceInfoProvider taskManagerResourceInfoProvider = TestingTaskManagerResourceInfoProvider.newBuilder().setPendingTaskManagersSupplier(() -> Collections.singleton(pendingTaskManager)).build();
        ResourceReconcileResult result = ANY_MATCHING_STRATEGY.tryReconcileClusterResources((TaskManagerResourceInfoProvider)taskManagerResourceInfoProvider);
        Assertions.assertThat((List)result.getPendingTaskManagersToRelease()).containsExactly((Object[])new PendingTaskManager[]{pendingTaskManager});
    }

    @Test
    void testUsedPendingTaskManagerShouldNotBeReleased() {
        PendingTaskManager pendingTaskManager = new PendingTaskManager(DEFAULT_SLOT_RESOURCE, 1);
        pendingTaskManager.replaceAllPendingAllocations(Collections.singletonMap(new JobID(), ResourceCounter.withResource((ResourceProfile)DEFAULT_SLOT_RESOURCE, (int)1)));
        TestingTaskManagerResourceInfoProvider taskManagerResourceInfoProvider = TestingTaskManagerResourceInfoProvider.newBuilder().setPendingTaskManagersSupplier(() -> Collections.singleton(pendingTaskManager)).build();
        ResourceReconcileResult result = ANY_MATCHING_STRATEGY.tryReconcileClusterResources((TaskManagerResourceInfoProvider)taskManagerResourceInfoProvider);
        Assertions.assertThat((List)result.getPendingTaskManagersToRelease()).isEmpty();
    }

    @Test
    void testFulFillRequirementShouldTakeRedundantInAccount() {
        DefaultResourceAllocationStrategy strategy = DefaultResourceAllocationStrategyTest.createStrategy(1);
        TestingTaskManagerInfo taskManager = new TestingTaskManagerInfo(DEFAULT_SLOT_RESOURCE.multiply(5), DEFAULT_SLOT_RESOURCE.multiply(5), DEFAULT_SLOT_RESOURCE);
        JobID jobId = new JobID();
        ResourceProfile largeResource = DEFAULT_SLOT_RESOURCE.multiply(4);
        List<ResourceRequirement> requirements = Collections.singletonList(ResourceRequirement.create((ResourceProfile)largeResource, (int)1));
        TestingTaskManagerResourceInfoProvider taskManagerResourceInfoProvider = TestingTaskManagerResourceInfoProvider.newBuilder().setRegisteredTaskManagersSupplier(() -> Collections.singleton(taskManager)).build();
        ResourceAllocationResult result = strategy.tryFulfillRequirements(Collections.singletonMap(jobId, requirements), (TaskManagerResourceInfoProvider)taskManagerResourceInfoProvider, resourceID -> false);
        Assertions.assertThat((Collection)result.getUnfulfillableJobs()).isEmpty();
        Assertions.assertThat((List)result.getPendingTaskManagersToAllocate()).hasSize(1);
        Assertions.assertThat((Map)result.getAllocationsOnPendingResources()).isEmpty();
    }

    @Test
    void testUnusedResourcesShouldBeReleasedIfNonIdleResourceIsEnough() {
        TestingTaskManagerInfo taskManagerInUse = new TestingTaskManagerInfo(DEFAULT_SLOT_RESOURCE.multiply(5), DEFAULT_SLOT_RESOURCE.multiply(2), DEFAULT_SLOT_RESOURCE);
        TestingTaskManagerInfo taskManagerIdle = new TestingTaskManagerInfo(DEFAULT_SLOT_RESOURCE.multiply(5), DEFAULT_SLOT_RESOURCE.multiply(5), DEFAULT_SLOT_RESOURCE);
        taskManagerIdle.setIdleSince(System.currentTimeMillis() - 10L);
        PendingTaskManager pendingTaskManagerInUse = new PendingTaskManager(DEFAULT_SLOT_RESOURCE.multiply(5), 5);
        pendingTaskManagerInUse.replaceAllPendingAllocations(Collections.singletonMap(new JobID(), ResourceCounter.withResource((ResourceProfile)DEFAULT_SLOT_RESOURCE, (int)2)));
        PendingTaskManager pendingTaskManagerIdle = new PendingTaskManager(DEFAULT_SLOT_RESOURCE.multiply(5), 5);
        TestingTaskManagerResourceInfoProvider taskManagerResourceInfoProvider = TestingTaskManagerResourceInfoProvider.newBuilder().setRegisteredTaskManagersSupplier(() -> Arrays.asList(taskManagerInUse, taskManagerIdle)).setPendingTaskManagersSupplier(() -> Arrays.asList(pendingTaskManagerInUse, pendingTaskManagerIdle)).build();
        DefaultResourceAllocationStrategy strategy = DefaultResourceAllocationStrategyTest.createStrategy(1);
        ResourceReconcileResult result = strategy.tryReconcileClusterResources((TaskManagerResourceInfoProvider)taskManagerResourceInfoProvider);
        Assertions.assertThat((List)result.getPendingTaskManagersToRelease()).containsExactly((Object[])new PendingTaskManager[]{pendingTaskManagerIdle});
        Assertions.assertThat((List)result.getTaskManagersToRelease()).containsExactly((Object[])new TaskManagerInfo[]{taskManagerIdle});
    }

    @Test
    void testRedundantResourceShouldBeFulfilled() {
        TestingTaskManagerInfo taskManagerInUse = new TestingTaskManagerInfo(DEFAULT_SLOT_RESOURCE.multiply(5), DEFAULT_SLOT_RESOURCE.multiply(2), DEFAULT_SLOT_RESOURCE);
        TestingTaskManagerInfo taskManagerIdle = new TestingTaskManagerInfo(DEFAULT_SLOT_RESOURCE.multiply(5), DEFAULT_SLOT_RESOURCE.multiply(5), DEFAULT_SLOT_RESOURCE);
        taskManagerIdle.setIdleSince(System.currentTimeMillis() - 10L);
        PendingTaskManager pendingTaskManagerIdle = new PendingTaskManager(DEFAULT_SLOT_RESOURCE.multiply(5), 5);
        TestingTaskManagerResourceInfoProvider taskManagerResourceInfoProvider = TestingTaskManagerResourceInfoProvider.newBuilder().setRegisteredTaskManagersSupplier(() -> Arrays.asList(taskManagerInUse, taskManagerIdle)).setPendingTaskManagersSupplier(() -> Collections.singletonList(pendingTaskManagerIdle)).build();
        DefaultResourceAllocationStrategy strategy = DefaultResourceAllocationStrategyTest.createStrategy(4);
        ResourceReconcileResult result = strategy.tryReconcileClusterResources((TaskManagerResourceInfoProvider)taskManagerResourceInfoProvider);
        Assertions.assertThat((List)result.getPendingTaskManagersToRelease()).isEmpty();
        Assertions.assertThat((List)result.getTaskManagersToRelease()).isEmpty();
        Assertions.assertThat((List)result.getPendingTaskManagersToAllocate()).hasSize(2);
    }

    @Test
    void testRedundantResourceShouldBeReserved() {
        TestingTaskManagerInfo taskManagerInUse = new TestingTaskManagerInfo(DEFAULT_SLOT_RESOURCE.multiply(5), DEFAULT_SLOT_RESOURCE.multiply(2), DEFAULT_SLOT_RESOURCE);
        TestingTaskManagerInfo taskManagerIdle = new TestingTaskManagerInfo(DEFAULT_SLOT_RESOURCE.multiply(5), DEFAULT_SLOT_RESOURCE.multiply(5), DEFAULT_SLOT_RESOURCE);
        taskManagerIdle.setIdleSince(System.currentTimeMillis() - 10L);
        PendingTaskManager pendingTaskManagerIdle = new PendingTaskManager(DEFAULT_SLOT_RESOURCE.multiply(5), 5);
        TestingTaskManagerResourceInfoProvider taskManagerResourceInfoProvider = TestingTaskManagerResourceInfoProvider.newBuilder().setRegisteredTaskManagersSupplier(() -> Arrays.asList(taskManagerInUse, taskManagerIdle)).setPendingTaskManagersSupplier(() -> Collections.singletonList(pendingTaskManagerIdle)).build();
        DefaultResourceAllocationStrategy strategy = DefaultResourceAllocationStrategyTest.createStrategy(1);
        ResourceReconcileResult result = strategy.tryReconcileClusterResources((TaskManagerResourceInfoProvider)taskManagerResourceInfoProvider);
        Assertions.assertThat((List)result.getPendingTaskManagersToRelease()).containsExactly((Object[])new PendingTaskManager[]{pendingTaskManagerIdle});
        Assertions.assertThat((List)result.getTaskManagersToRelease()).isEmpty();
    }

    @Test
    void testMinRequiredCPULimitInTryReconcile() {
        CPUResource minRequiredCPU = (CPUResource)((CPUResource)DEFAULT_SLOT_RESOURCE.getCpuCores().multiply(5)).multiply(BigDecimal.valueOf(4.5));
        this.testMinResourceLimitInReconcile(minRequiredCPU, MemorySize.ZERO, 2);
        this.testMinResourceLimitInReconcileWithNoResource(minRequiredCPU, MemorySize.ZERO, 5);
    }

    @Test
    void testMinRequiredMemoryLimitInTryReconcile() {
        MemorySize minRequiredMemory = DEFAULT_SLOT_RESOURCE.getTotalMemory().multiply(5.0).multiply(3.5);
        this.testMinResourceLimitInReconcile(new CPUResource(0.0), minRequiredMemory, 1);
        this.testMinResourceLimitInReconcileWithNoResource(new CPUResource(0.0), minRequiredMemory, 4);
    }

    @Test
    void testMinRequiredCPULimitInTryFulfill() {
        CPUResource minRequiredCPU = (CPUResource)((CPUResource)DEFAULT_SLOT_RESOURCE.getCpuCores().multiply(5)).multiply(BigDecimal.valueOf(4.5));
        this.testMinRequiredResourceLimitInFulfillRequirements(minRequiredCPU, MemorySize.ZERO, 4);
    }

    @Test
    void testMinRequiredCPULimitInTryFulfillWithRequirement() {
        CPUResource minRequiredCPU = (CPUResource)((CPUResource)DEFAULT_SLOT_RESOURCE.getCpuCores().multiply(5)).multiply(BigDecimal.valueOf(4.5));
        List<ResourceRequirement> resourceRequirements = Collections.singletonList(ResourceRequirement.create((ResourceProfile)DEFAULT_SLOT_RESOURCE, (int)15));
        this.testMinRequiredResourceLimitInFulfillRequirements(minRequiredCPU, MemorySize.ZERO, 4, resourceRequirements);
    }

    @Test
    void testMinRequiredMemoryLimitInTryFulfill() {
        MemorySize minRequiredMemory = DEFAULT_SLOT_RESOURCE.getTotalMemory().multiply(5.0).multiply(3.5);
        this.testMinRequiredResourceLimitInFulfillRequirements(new CPUResource(0.0), minRequiredMemory, 3);
    }

    @Test
    void testMinRequiredMemoryLimitInTryFulfillWithRequirement() {
        CPUResource minRequiredCPU = (CPUResource)((CPUResource)DEFAULT_SLOT_RESOURCE.getCpuCores().multiply(5)).multiply(BigDecimal.valueOf(4.5));
        List<ResourceRequirement> resourceRequirements = Collections.singletonList(ResourceRequirement.create((ResourceProfile)DEFAULT_SLOT_RESOURCE, (int)15));
        this.testMinRequiredResourceLimitInFulfillRequirements(minRequiredCPU, MemorySize.ZERO, 4, resourceRequirements);
    }

    void testMinResourceLimitInReconcile(CPUResource minRequiredCPU, MemorySize minRequiredMemory, int pendingTaskManagersToAllocate) {
        TestingTaskManagerInfo taskManagerInUse = new TestingTaskManagerInfo(DEFAULT_SLOT_RESOURCE.multiply(5), DEFAULT_SLOT_RESOURCE.multiply(5), DEFAULT_SLOT_RESOURCE);
        TestingTaskManagerInfo taskManagerIdle = new TestingTaskManagerInfo(DEFAULT_SLOT_RESOURCE.multiply(5), DEFAULT_SLOT_RESOURCE.multiply(5), DEFAULT_SLOT_RESOURCE);
        taskManagerIdle.setIdleSince(System.currentTimeMillis() - 10L);
        PendingTaskManager pendingTaskManager = new PendingTaskManager(DEFAULT_SLOT_RESOURCE.multiply(5), 5);
        TestingTaskManagerResourceInfoProvider taskManagerResourceInfoProvider = TestingTaskManagerResourceInfoProvider.newBuilder().setRegisteredTaskManagersSupplier(() -> Arrays.asList(taskManagerInUse, taskManagerIdle)).setPendingTaskManagersSupplier(() -> Collections.singletonList(pendingTaskManager)).build();
        DefaultResourceAllocationStrategy strategy = DefaultResourceAllocationStrategyTest.createStrategy(minRequiredCPU, minRequiredMemory);
        ResourceReconcileResult result = strategy.tryReconcileClusterResources((TaskManagerResourceInfoProvider)taskManagerResourceInfoProvider);
        Assertions.assertThat((List)result.getPendingTaskManagersToRelease()).isEmpty();
        Assertions.assertThat((List)result.getTaskManagersToRelease()).isEmpty();
        Assertions.assertThat((List)result.getPendingTaskManagersToAllocate()).hasSize(pendingTaskManagersToAllocate);
    }

    void testMinResourceLimitInReconcileWithNoResource(CPUResource minRequiredCPU, MemorySize minRequiredMemory, int pendingTaskManagersToAllocate) {
        TestingTaskManagerResourceInfoProvider taskManagerResourceInfoProvider = TestingTaskManagerResourceInfoProvider.newBuilder().setRegisteredTaskManagersSupplier(() -> Arrays.asList(new TaskManagerInfo[0])).setPendingTaskManagersSupplier(() -> Arrays.asList(new PendingTaskManager[0])).build();
        DefaultResourceAllocationStrategy strategy = DefaultResourceAllocationStrategyTest.createStrategy(minRequiredCPU, minRequiredMemory);
        ResourceReconcileResult result = strategy.tryReconcileClusterResources((TaskManagerResourceInfoProvider)taskManagerResourceInfoProvider);
        Assertions.assertThat((List)result.getPendingTaskManagersToRelease()).isEmpty();
        Assertions.assertThat((List)result.getTaskManagersToRelease()).isEmpty();
        Assertions.assertThat((List)result.getPendingTaskManagersToAllocate()).hasSize(pendingTaskManagersToAllocate);
    }

    void testMinRequiredResourceLimitInFulfillRequirements(CPUResource minRequiredCPU, MemorySize minRequiredMemory, int pendingTaskManagersToAllocate) {
        this.testMinRequiredResourceLimitInFulfillRequirements(minRequiredCPU, minRequiredMemory, pendingTaskManagersToAllocate, Collections.emptyList());
    }

    void testMinRequiredResourceLimitInFulfillRequirements(CPUResource minRequiredCPU, MemorySize minRequiredMemory, int pendingTaskManagersToAllocate, List<ResourceRequirement> requirements) {
        JobID jobId = new JobID();
        PendingTaskManager pendingTaskManager = new PendingTaskManager(DEFAULT_SLOT_RESOURCE.multiply(5), 5);
        TestingTaskManagerResourceInfoProvider taskManagerResourceInfoProvider = TestingTaskManagerResourceInfoProvider.newBuilder().setRegisteredTaskManagersSupplier(() -> Arrays.asList(new TaskManagerInfo[0])).setPendingTaskManagersSupplier(() -> Collections.singletonList(pendingTaskManager)).build();
        DefaultResourceAllocationStrategy strategy = DefaultResourceAllocationStrategyTest.createStrategy(minRequiredCPU, minRequiredMemory);
        ResourceAllocationResult result = strategy.tryFulfillRequirements(Collections.singletonMap(jobId, requirements), (TaskManagerResourceInfoProvider)taskManagerResourceInfoProvider, resourceID -> false);
        Assertions.assertThat((List)result.getPendingTaskManagersToAllocate()).hasSize(pendingTaskManagersToAllocate);
    }

    private static DefaultResourceAllocationStrategy createStrategy(TaskManagerOptions.TaskManagerLoadBalanceMode taskManagerLoadBalanceMode) {
        return DefaultResourceAllocationStrategyTest.createStrategy(taskManagerLoadBalanceMode, 0);
    }

    private static DefaultResourceAllocationStrategy createStrategy(int redundantTaskManagerNum) {
        return DefaultResourceAllocationStrategyTest.createStrategy(TaskManagerOptions.TaskManagerLoadBalanceMode.NONE, redundantTaskManagerNum);
    }

    private static DefaultResourceAllocationStrategy createStrategy(TaskManagerOptions.TaskManagerLoadBalanceMode taskManagerLoadBalanceMode, int redundantTaskManagerNum) {
        return new DefaultResourceAllocationStrategy(DEFAULT_SLOT_RESOURCE.multiply(5), 5, taskManagerLoadBalanceMode, Time.milliseconds((long)0L), redundantTaskManagerNum, new CPUResource(0.0), MemorySize.ZERO);
    }

    private static DefaultResourceAllocationStrategy createStrategy(CPUResource minRequiredCPU, MemorySize minRequiredMemory) {
        return new DefaultResourceAllocationStrategy(DEFAULT_SLOT_RESOURCE.multiply(5), 5, TaskManagerOptions.TaskManagerLoadBalanceMode.NONE, Time.milliseconds((long)0L), 0, minRequiredCPU, minRequiredMemory);
    }
}

