/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive.allocator;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
import org.apache.flink.runtime.scheduler.adaptive.allocator.AllocatorUtil;
import org.apache.flink.runtime.scheduler.adaptive.allocator.JobAllocationsInformation;
import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAssigner;
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator;
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;

public class DefaultSlotAssigner
implements SlotAssigner {
    @VisibleForTesting
    static final String APPLICATION_MODE_EXECUTION_TARGET = "embedded";
    @Nullable
    private final String executionTarget;
    private final boolean minimalTaskManagerPreferred;

    DefaultSlotAssigner(@Nullable String executionTarget, boolean minimalTaskManagerPreferred) {
        this.executionTarget = executionTarget;
        this.minimalTaskManagerPreferred = minimalTaskManagerPreferred;
    }

    @Override
    public Collection<JobSchedulingPlan.SlotAssignment> assignSlots(JobInformation jobInformation, Collection<? extends SlotInfo> freeSlots, VertexParallelism vertexParallelism, JobAllocationsInformation previousAllocations) {
        AllocatorUtil.checkMinimumRequiredSlots(jobInformation, freeSlots);
        ArrayList<SlotSharingSlotAllocator.ExecutionSlotSharingGroup> allGroups = new ArrayList<SlotSharingSlotAllocator.ExecutionSlotSharingGroup>();
        for (SlotSharingGroup slotSharingGroup : jobInformation.getSlotSharingGroups()) {
            allGroups.addAll(AllocatorUtil.createExecutionSlotSharingGroups(vertexParallelism, slotSharingGroup));
        }
        Collection<? extends SlotInfo> pickedSlots = this.pickSlotsIfNeeded(allGroups.size(), freeSlots);
        Iterator<? extends SlotInfo> iterator = pickedSlots.iterator();
        ArrayList<JobSchedulingPlan.SlotAssignment> assignments = new ArrayList<JobSchedulingPlan.SlotAssignment>();
        for (SlotSharingSlotAllocator.ExecutionSlotSharingGroup group : allGroups) {
            assignments.add(new JobSchedulingPlan.SlotAssignment(iterator.next(), group));
        }
        return assignments;
    }

    @VisibleForTesting
    Collection<? extends SlotInfo> pickSlotsIfNeeded(int requestExecutionSlotSharingGroups, Collection<? extends SlotInfo> freeSlots) {
        Collection<? extends SlotInfo> pickedSlots = freeSlots;
        if (APPLICATION_MODE_EXECUTION_TARGET.equalsIgnoreCase(this.executionTarget) && this.minimalTaskManagerPreferred && freeSlots.size() > requestExecutionSlotSharingGroups) {
            Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> slotsPerTaskExecutor = this.getSlotsPerTaskExecutor(freeSlots);
            pickedSlots = this.pickSlotsInMinimalTaskExecutors(slotsPerTaskExecutor, requestExecutionSlotSharingGroups);
        }
        return pickedSlots;
    }

    private Iterator<TaskManagerLocation> getSortedTaskExecutors(Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> slotsPerTaskExecutor) {
        Comparator taskExecutorComparator = (leftTml, rightTml) -> Integer.compare(((Set)slotsPerTaskExecutor.get(rightTml)).size(), ((Set)slotsPerTaskExecutor.get(leftTml)).size());
        return slotsPerTaskExecutor.keySet().stream().sorted(taskExecutorComparator).iterator();
    }

    private Collection<? extends SlotInfo> pickSlotsInMinimalTaskExecutors(Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> slotsByTaskExecutor, int requestedGroups) {
        ArrayList<? extends SlotInfo> pickedSlots = new ArrayList<SlotInfo>();
        Iterator<TaskManagerLocation> sortedTaskExecutors = this.getSortedTaskExecutors(slotsByTaskExecutor);
        while (pickedSlots.size() < requestedGroups) {
            Set<? extends SlotInfo> slotInfos = slotsByTaskExecutor.get(sortedTaskExecutors.next());
            pickedSlots.addAll(slotInfos);
        }
        return pickedSlots;
    }

    private Map<TaskManagerLocation, ? extends Set<? extends SlotInfo>> getSlotsPerTaskExecutor(Collection<? extends SlotInfo> slots) {
        return slots.stream().collect(Collectors.groupingBy(SlotInfo::getTaskManagerLocation, Collectors.mapping(Function.identity(), Collectors.toSet())));
    }
}

