/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobmaster.slotpool.PendingRequest;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.RequestSlotMatchingStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.ResourceRequestPreMappings;
import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
import org.apache.flink.runtime.scheduler.loading.WeightLoadable;
import org.apache.flink.runtime.state.PriorityComparator;
import org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.HeapPriorityQueue;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public enum TasksBalancedRequestSlotMatchingStrategy implements RequestSlotMatchingStrategy
{
    INSTANCE;

    public static final Logger LOG;

    @Override
    public Collection<RequestSlotMatchingStrategy.RequestSlotMatch> matchRequestsAndSlots(Collection<? extends PhysicalSlot> slots, Collection<PendingRequest> pendingRequests, Map<ResourceID, LoadingWeight> taskExecutorsLoad) {
        ResourceRequestPreMappings resourceRequestPreMappings = ResourceRequestPreMappings.createFrom(pendingRequests, slots);
        if (!resourceRequestPreMappings.isMatchingFulfilled()) {
            return Collections.emptyList();
        }
        ArrayList<RequestSlotMatchingStrategy.RequestSlotMatch> resultingMatches = new ArrayList<RequestSlotMatchingStrategy.RequestSlotMatch>();
        List<PendingRequest> sortedRequests = WeightLoadable.sortByLoadingDescend(pendingRequests);
        TasksBalancedRequestSlotMatchingStrategy.logDebugInfo(slots, taskExecutorsLoad, sortedRequests);
        Collection slotElements = slots.stream().map(PhysicalSlotElement::new).collect(Collectors.toList());
        Map<ResourceProfile, HeapPriorityQueue<PhysicalSlotElement>> profileSlots = this.getSlotCandidatesByProfile(slotElements, taskExecutorsLoad);
        Map<ResourceID, Set<PhysicalSlotElement>> taskExecutorSlots = this.groupSlotsByTaskExecutor(slotElements);
        for (PendingRequest request : sortedRequests) {
            ResourceProfile requestProfile = request.getResourceProfile();
            Optional<PhysicalSlotElement> bestSlotEleOpt = this.tryMatchPhysicalSlot(request, profileSlots, taskExecutorsLoad, resourceRequestPreMappings);
            if (!bestSlotEleOpt.isPresent()) continue;
            PhysicalSlotElement slotElement = bestSlotEleOpt.get();
            TasksBalancedRequestSlotMatchingStrategy.updateTaskExecutorsLoad(taskExecutorsLoad, request, slotElement);
            this.updateReferenceRemainingSlots(profileSlots, taskExecutorSlots, slotElement);
            resourceRequestPreMappings.decrease(requestProfile, slotElement.getResourceProfile());
            resultingMatches.add(RequestSlotMatchingStrategy.RequestSlotMatch.createFor(request, slotElement.physicalSlot));
        }
        return resultingMatches;
    }

    private static void updateTaskExecutorsLoad(Map<ResourceID, LoadingWeight> taskExecutorsLoad, PendingRequest request, PhysicalSlotElement slotElement) {
        taskExecutorsLoad.compute(slotElement.getResourceID(), (ignoredId, oldLoading) -> Objects.isNull(oldLoading) ? request.getLoading() : oldLoading.merge(request.getLoading()));
    }

    private static void logDebugInfo(Collection<? extends PhysicalSlot> slots, Map<ResourceID, LoadingWeight> taskExecutorsLoad, List<PendingRequest> sortedRequests) {
        LOG.debug("Available slots: {}, sortedRequests: {}, taskExecutorsLoad: {}", new Object[]{slots, sortedRequests, taskExecutorsLoad});
    }

    private Map<ResourceID, Set<PhysicalSlotElement>> groupSlotsByTaskExecutor(Collection<PhysicalSlotElement> slotElements) {
        return slotElements.stream().collect(Collectors.groupingBy(PhysicalSlotElement::getResourceID, Collectors.toSet()));
    }

    private Map<ResourceProfile, HeapPriorityQueue<PhysicalSlotElement>> getSlotCandidatesByProfile(Collection<PhysicalSlotElement> slotElements, Map<ResourceID, LoadingWeight> taskExecutorsLoad) {
        HashMap<ResourceProfile, HeapPriorityQueue<PhysicalSlotElement>> result = new HashMap<ResourceProfile, HeapPriorityQueue<PhysicalSlotElement>>();
        PhysicalSlotElementPriorityComparator physicalSlotElementPriorityComparator = new PhysicalSlotElementPriorityComparator(taskExecutorsLoad);
        for (PhysicalSlotElement slotEle : slotElements) {
            result.compute(slotEle.getResourceProfile(), (resourceProfile, oldSlots) -> {
                HeapPriorityQueue<PhysicalSlotElement> values = Objects.isNull(oldSlots) ? new HeapPriorityQueue<PhysicalSlotElement>(physicalSlotElementPriorityComparator, 8) : oldSlots;
                values.add(slotEle);
                return values;
            });
        }
        return result;
    }

    private Optional<PhysicalSlotElement> tryMatchPhysicalSlot(PendingRequest request, Map<ResourceProfile, HeapPriorityQueue<PhysicalSlotElement>> profileToSlotMap, Map<ResourceID, LoadingWeight> taskExecutorsLoad, ResourceRequestPreMappings resourceRequestPreMappings) {
        ResourceProfile requestProfile = request.getResourceProfile();
        Set candidateProfiles = profileToSlotMap.keySet().stream().filter(slotProfile -> slotProfile.isMatching(requestProfile) && resourceRequestPreMappings.hasAvailableProfile(requestProfile, (ResourceProfile)slotProfile)).collect(Collectors.toSet());
        return candidateProfiles.stream().map(candidateProfile -> {
            HeapPriorityQueue slots = (HeapPriorityQueue)profileToSlotMap.get(candidateProfile);
            return Objects.isNull(slots) ? null : (PhysicalSlotElement)slots.peek();
        }).filter(Objects::nonNull).min(new PhysicalSlotElementComparator(taskExecutorsLoad));
    }

    private void updateReferenceRemainingSlots(Map<ResourceProfile, HeapPriorityQueue<PhysicalSlotElement>> profileSlots, Map<ResourceID, Set<PhysicalSlotElement>> taskExecutorSlots, PhysicalSlotElement targetSlotElement) {
        ResourceID tmID = targetSlotElement.getResourceID();
        Set<PhysicalSlotElement> slotToReSort = taskExecutorSlots.remove(tmID);
        for (PhysicalSlotElement slotEle : slotToReSort) {
            HeapPriorityQueue<PhysicalSlotElement> slotsOfProfile = profileSlots.get(slotEle.getResourceProfile());
            slotsOfProfile.remove(slotEle);
            if (slotEle.equals(targetSlotElement)) continue;
            slotsOfProfile.add(slotEle);
        }
        slotToReSort.remove(targetSlotElement);
        taskExecutorSlots.put(tmID, slotToReSort);
    }

    public String toString() {
        return TasksBalancedRequestSlotMatchingStrategy.class.getSimpleName();
    }

    static {
        LOG = LoggerFactory.getLogger(TasksBalancedRequestSlotMatchingStrategy.class);
    }

    static final class PhysicalSlotElementPriorityComparator
    implements PriorityComparator<PhysicalSlotElement> {
        private final PhysicalSlotElementComparator physicalSlotElementComparator;

        PhysicalSlotElementPriorityComparator(Map<ResourceID, LoadingWeight> taskExecutorsLoading) {
            this.physicalSlotElementComparator = new PhysicalSlotElementComparator(taskExecutorsLoading);
        }

        @Override
        public int comparePriority(PhysicalSlotElement left, PhysicalSlotElement right) {
            return this.physicalSlotElementComparator.compare(left, right);
        }
    }

    static final class PhysicalSlotElement
    extends AbstractHeapPriorityQueueElement {
        private final PhysicalSlot physicalSlot;

        public PhysicalSlotElement(PhysicalSlot physicalSlot) {
            this.physicalSlot = physicalSlot;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o instanceof PhysicalSlotElement) {
                return this.physicalSlot.equals(((PhysicalSlotElement)o).physicalSlot);
            }
            return false;
        }

        public ResourceID getResourceID() {
            return this.physicalSlot.getTaskManagerLocation().getResourceID();
        }

        public ResourceProfile getResourceProfile() {
            return this.physicalSlot.getResourceProfile();
        }

        public int hashCode() {
            return this.physicalSlot.hashCode();
        }
    }

    static final class PhysicalSlotElementComparator
    implements Comparator<PhysicalSlotElement> {
        private final Map<ResourceID, LoadingWeight> taskExecutorsLoading;

        PhysicalSlotElementComparator(Map<ResourceID, LoadingWeight> taskExecutorsLoading) {
            this.taskExecutorsLoading = Preconditions.checkNotNull(taskExecutorsLoading);
        }

        @Override
        public int compare(PhysicalSlotElement left, PhysicalSlotElement right) {
            LoadingWeight leftLoad = this.taskExecutorsLoading.getOrDefault(left.getResourceID(), DefaultLoadingWeight.EMPTY);
            LoadingWeight rightLoad = this.taskExecutorsLoading.getOrDefault(right.getResourceID(), DefaultLoadingWeight.EMPTY);
            return leftLoad.compareTo(rightLoad);
        }
    }
}

