/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive.allocator;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
import org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil;
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotMatchingResolver;
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator;
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotTaskExecutorWeight;
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotsUtilization;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FlinkRuntimeException;

public enum SlotsBalancedSlotMatchingResolver implements SlotMatchingResolver
{
    INSTANCE;


    @Override
    public Collection<JobSchedulingPlan.SlotAssignment> matchSlotSharingGroupWithSlots(Collection<SlotSharingSlotAllocator.ExecutionSlotSharingGroup> requestGroups, Collection<PhysicalSlot> freeSlots) {
        ArrayList<JobSchedulingPlan.SlotAssignment> slotAssignments = new ArrayList<JobSchedulingPlan.SlotAssignment>(requestGroups.size());
        Map<ResourceID, Set<PhysicalSlot>> slotsPerTaskExecutor = AllocatorUtil.getSlotsPerTaskExecutor(freeSlots);
        Map<ResourceID, SlotsUtilization> taskExecutorSlotsUtilizations = this.getSlotsUtilizationView(slotsPerTaskExecutor);
        TreeMap<Double, Set<PhysicalSlot>> utilizationSlotsMap = SlotsBalancedSlotMatchingResolver.getUtilizationSlotsMap(freeSlots, taskExecutorSlotsUtilizations);
        for (SlotSharingSlotAllocator.ExecutionSlotSharingGroup requestGroup : requestGroups) {
            SlotTaskExecutorWeight<SlotsUtilization> best = SlotsBalancedSlotMatchingResolver.getTheBestSlotUtilization(utilizationSlotsMap, taskExecutorSlotsUtilizations);
            ResourceID resourceID = best.getResourceID();
            slotAssignments.add(new JobSchedulingPlan.SlotAssignment(best.physicalSlot, requestGroup));
            SlotsUtilization oldSlotsUtilization = taskExecutorSlotsUtilizations.get(resourceID);
            SlotsUtilization newSlotsUtilization = oldSlotsUtilization.incReserved(1);
            taskExecutorSlotsUtilizations.put(resourceID, newSlotsUtilization);
            SlotsBalancedSlotMatchingResolver.updateSlotsPerTaskExecutor(slotsPerTaskExecutor, best);
            Set<PhysicalSlot> slotInfos = slotsPerTaskExecutor.get(best.getResourceID());
            SlotsBalancedSlotMatchingResolver.updateUtilizationSlotsMap(utilizationSlotsMap, best, slotInfos, newSlotsUtilization);
        }
        return slotAssignments;
    }

    private Map<ResourceID, SlotsUtilization> getSlotsUtilizationView(Map<ResourceID, Set<PhysicalSlot>> slotsPerTaskExecutor) {
        return slotsPerTaskExecutor.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> new SlotsUtilization(((Set)entry.getValue()).size(), 0)));
    }

    private static void updateUtilizationSlotsMap(Map<Double, Set<PhysicalSlot>> utilizationSlotsMap, SlotTaskExecutorWeight<SlotsUtilization> best, Set<PhysicalSlot> slotsToAdjust, SlotsUtilization newSlotsUtilization) {
        Double oldUtilization = ((SlotsUtilization)best.taskExecutorWeight).getUtilization();
        Double newUtilization = newSlotsUtilization.getUtilization();
        Set<PhysicalSlot> physicalSlots = utilizationSlotsMap.get(oldUtilization);
        if (Objects.nonNull(physicalSlots)) {
            physicalSlots.remove(best.physicalSlot);
            if (Objects.nonNull(slotsToAdjust)) {
                physicalSlots.removeAll(slotsToAdjust);
            }
        }
        if (CollectionUtil.isNullOrEmpty(physicalSlots)) {
            utilizationSlotsMap.remove(oldUtilization);
        }
        if (Objects.nonNull(slotsToAdjust)) {
            utilizationSlotsMap.computeIfAbsent(newUtilization, slotsUtilization -> new HashSet()).addAll(slotsToAdjust);
        }
    }

    private static void updateSlotsPerTaskExecutor(Map<ResourceID, Set<PhysicalSlot>> slotsPerTaskExecutor, SlotTaskExecutorWeight<SlotsUtilization> best) {
        Set<PhysicalSlot> slotInfos = slotsPerTaskExecutor.get(best.getResourceID());
        if (Objects.nonNull(slotInfos)) {
            slotInfos.remove(best.physicalSlot);
        }
        if (Objects.isNull(slotInfos) || slotInfos.isEmpty()) {
            slotsPerTaskExecutor.remove(best.getResourceID());
        }
    }

    private static TreeMap<Double, Set<PhysicalSlot>> getUtilizationSlotsMap(Collection<PhysicalSlot> slots, Map<ResourceID, SlotsUtilization> slotsUtilizations) {
        return slots.stream().collect(Collectors.groupingBy(physicalSlot -> ((SlotsUtilization)slotsUtilizations.get(physicalSlot.getTaskManagerLocation().getResourceID())).getUtilization(), TreeMap::new, Collectors.toSet()));
    }

    private static SlotTaskExecutorWeight<SlotsUtilization> getTheBestSlotUtilization(TreeMap<Double, Set<PhysicalSlot>> slotsByUtilization, Map<ResourceID, SlotsUtilization> taskExecutorSlotsUtilizations) {
        Map.Entry<Double, Set<PhysicalSlot>> firstEntry = slotsByUtilization.firstEntry();
        if (firstEntry == null || firstEntry.getKey() == null || CollectionUtil.isNullOrEmpty((Collection)firstEntry.getValue())) {
            throw (FlinkRuntimeException)NO_SLOTS_EXCEPTION_GETTER.get();
        }
        PhysicalSlot slot = firstEntry.getValue().iterator().next();
        ResourceID resourceID = slot.getTaskManagerLocation().getResourceID();
        SlotsUtilization slotsUtilization = taskExecutorSlotsUtilizations.get(resourceID);
        return new SlotTaskExecutorWeight<SlotsUtilization>(slotsUtilization, slot);
    }
}

