/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecker;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocator;
import org.apache.flink.runtime.scheduler.ExecutionSlotAssignment;
import org.apache.flink.runtime.scheduler.ExecutionSlotSharingGroup;
import org.apache.flink.runtime.scheduler.SharedSlot;
import org.apache.flink.runtime.scheduler.SharedSlotProfileRetriever;
import org.apache.flink.runtime.scheduler.SharingPhysicalSlotRequestBulk;
import org.apache.flink.runtime.scheduler.SlotSharingStrategy;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SlotSharingExecutionSlotAllocator
implements ExecutionSlotAllocator {
    private static final Logger LOG = LoggerFactory.getLogger(SlotSharingExecutionSlotAllocator.class);
    private final PhysicalSlotProvider slotProvider;
    private final boolean slotWillBeOccupiedIndefinitely;
    private final SlotSharingStrategy slotSharingStrategy;
    private final Map<ExecutionSlotSharingGroup, SharedSlot> sharedSlots;
    private final SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory sharedSlotProfileRetrieverFactory;
    private final PhysicalSlotRequestBulkChecker bulkChecker;
    private final Duration allocationTimeout;
    private final Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever;

    SlotSharingExecutionSlotAllocator(PhysicalSlotProvider slotProvider, boolean slotWillBeOccupiedIndefinitely, SlotSharingStrategy slotSharingStrategy, SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory sharedSlotProfileRetrieverFactory, PhysicalSlotRequestBulkChecker bulkChecker, Duration allocationTimeout, Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever) {
        this.slotProvider = Preconditions.checkNotNull(slotProvider);
        this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
        this.slotSharingStrategy = Preconditions.checkNotNull(slotSharingStrategy);
        this.sharedSlotProfileRetrieverFactory = Preconditions.checkNotNull(sharedSlotProfileRetrieverFactory);
        this.bulkChecker = Preconditions.checkNotNull(bulkChecker);
        this.allocationTimeout = Preconditions.checkNotNull(allocationTimeout);
        this.resourceProfileRetriever = Preconditions.checkNotNull(resourceProfileRetriever);
        this.sharedSlots = new IdentityHashMap<ExecutionSlotSharingGroup, SharedSlot>();
        this.slotProvider.disableBatchSlotRequestTimeoutCheck();
    }

    @Override
    public Map<ExecutionAttemptID, ExecutionSlotAssignment> allocateSlotsFor(List<ExecutionAttemptID> executionAttemptIds) {
        HashMap vertexIdToExecutionId = new HashMap();
        executionAttemptIds.forEach(executionId -> vertexIdToExecutionId.put(executionId.getExecutionVertexId(), executionId));
        Preconditions.checkState(vertexIdToExecutionId.size() == executionAttemptIds.size(), "SlotSharingExecutionSlotAllocator does not support one execution vertex to have multiple concurrent executions");
        List<ExecutionVertexID> vertexIds = executionAttemptIds.stream().map(ExecutionAttemptID::getExecutionVertexId).collect(Collectors.toList());
        return this.allocateSlotsForVertices(vertexIds).stream().collect(Collectors.toMap(vertexAssignment -> (ExecutionAttemptID)vertexIdToExecutionId.get(vertexAssignment.getExecutionVertexId()), vertexAssignment -> new ExecutionSlotAssignment((ExecutionAttemptID)vertexIdToExecutionId.get(vertexAssignment.getExecutionVertexId()), vertexAssignment.getLogicalSlotFuture())));
    }

    private List<SlotExecutionVertexAssignment> allocateSlotsForVertices(List<ExecutionVertexID> executionVertexIds) {
        SharedSlotProfileRetriever sharedSlotProfileRetriever = this.sharedSlotProfileRetrieverFactory.createFromBulk(new HashSet<ExecutionVertexID>(executionVertexIds));
        Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> executionsByGroup = executionVertexIds.stream().collect(Collectors.groupingBy(this.slotSharingStrategy::getExecutionSlotSharingGroup));
        HashMap<ExecutionSlotSharingGroup, SharedSlot> slots = new HashMap<ExecutionSlotSharingGroup, SharedSlot>(executionsByGroup.size());
        HashSet<ExecutionSlotSharingGroup> groupsToAssign = new HashSet<ExecutionSlotSharingGroup>(executionsByGroup.keySet());
        Map<ExecutionSlotSharingGroup, SharedSlot> assignedSlots = this.tryAssignExistingSharedSlots(groupsToAssign);
        slots.putAll(assignedSlots);
        groupsToAssign.removeAll(assignedSlots.keySet());
        if (!groupsToAssign.isEmpty()) {
            Map<ExecutionSlotSharingGroup, SharedSlot> allocatedSlots = this.allocateSharedSlots(groupsToAssign, sharedSlotProfileRetriever);
            slots.putAll(allocatedSlots);
            groupsToAssign.removeAll(allocatedSlots.keySet());
            Preconditions.checkState(groupsToAssign.isEmpty());
        }
        Map<ExecutionVertexID, SlotExecutionVertexAssignment> assignments = SlotSharingExecutionSlotAllocator.allocateLogicalSlotsFromSharedSlots(slots, executionsByGroup);
        SharingPhysicalSlotRequestBulk bulk = this.createBulk(slots, executionsByGroup);
        this.bulkChecker.schedulePendingRequestBulkTimeoutCheck(bulk, this.allocationTimeout);
        return executionVertexIds.stream().map(assignments::get).collect(Collectors.toList());
    }

    @Override
    public void cancel(ExecutionAttemptID executionAttemptId) {
        this.cancelLogicalSlotRequest(executionAttemptId.getExecutionVertexId(), null);
    }

    private void cancelLogicalSlotRequest(ExecutionVertexID executionVertexId, Throwable cause) {
        ExecutionSlotSharingGroup executionSlotSharingGroup = this.slotSharingStrategy.getExecutionSlotSharingGroup(executionVertexId);
        Preconditions.checkNotNull(executionSlotSharingGroup, "There is no ExecutionSlotSharingGroup for ExecutionVertexID " + String.valueOf(executionVertexId));
        SharedSlot slot = this.sharedSlots.get(executionSlotSharingGroup);
        if (slot != null) {
            slot.cancelLogicalSlotRequest(executionVertexId, cause);
        } else {
            LOG.debug("There is no SharedSlot for ExecutionSlotSharingGroup of ExecutionVertexID {}", (Object)executionVertexId);
        }
    }

    private static Map<ExecutionVertexID, SlotExecutionVertexAssignment> allocateLogicalSlotsFromSharedSlots(Map<ExecutionSlotSharingGroup, SharedSlot> slots, Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> executionsByGroup) {
        HashMap<ExecutionVertexID, SlotExecutionVertexAssignment> assignments = new HashMap<ExecutionVertexID, SlotExecutionVertexAssignment>();
        for (Map.Entry<ExecutionSlotSharingGroup, List<ExecutionVertexID>> entry : executionsByGroup.entrySet()) {
            ExecutionSlotSharingGroup group = entry.getKey();
            List<ExecutionVertexID> executionIds = entry.getValue();
            for (ExecutionVertexID executionId : executionIds) {
                CompletableFuture<LogicalSlot> logicalSlotFuture = slots.get(group).allocateLogicalSlot(executionId);
                SlotExecutionVertexAssignment assignment = new SlotExecutionVertexAssignment(executionId, logicalSlotFuture);
                assignments.put(executionId, assignment);
            }
        }
        return assignments;
    }

    private Map<ExecutionSlotSharingGroup, SharedSlot> tryAssignExistingSharedSlots(Set<ExecutionSlotSharingGroup> executionSlotSharingGroups) {
        HashMap<ExecutionSlotSharingGroup, SharedSlot> assignedSlots = new HashMap<ExecutionSlotSharingGroup, SharedSlot>(executionSlotSharingGroups.size());
        for (ExecutionSlotSharingGroup group : executionSlotSharingGroups) {
            SharedSlot sharedSlot = this.sharedSlots.get(group);
            if (sharedSlot == null) continue;
            assignedSlots.put(group, sharedSlot);
        }
        return assignedSlots;
    }

    private Map<ExecutionSlotSharingGroup, SharedSlot> allocateSharedSlots(Set<ExecutionSlotSharingGroup> executionSlotSharingGroups, SharedSlotProfileRetriever sharedSlotProfileRetriever) {
        ArrayList<PhysicalSlotRequest> slotRequests = new ArrayList<PhysicalSlotRequest>();
        HashMap<ExecutionSlotSharingGroup, SharedSlot> allocatedSlots = new HashMap<ExecutionSlotSharingGroup, SharedSlot>();
        HashMap<SlotRequestId, ExecutionSlotSharingGroup> requestToGroup = new HashMap<SlotRequestId, ExecutionSlotSharingGroup>();
        HashMap<SlotRequestId, ResourceProfile> requestToPhysicalResources = new HashMap<SlotRequestId, ResourceProfile>();
        for (ExecutionSlotSharingGroup group : executionSlotSharingGroups) {
            SlotRequestId physicalSlotRequestId = new SlotRequestId();
            ResourceProfile physicalSlotResourceProfile = this.getPhysicalSlotResourceProfile(group);
            SlotProfile slotProfile = sharedSlotProfileRetriever.getSlotProfile(group, physicalSlotResourceProfile);
            PhysicalSlotRequest request = new PhysicalSlotRequest(physicalSlotRequestId, slotProfile, group.getLoading(), this.slotWillBeOccupiedIndefinitely);
            slotRequests.add(request);
            requestToGroup.put(physicalSlotRequestId, group);
            requestToPhysicalResources.put(physicalSlotRequestId, physicalSlotResourceProfile);
        }
        Map<SlotRequestId, CompletableFuture<PhysicalSlotRequest.Result>> allocateResult = this.slotProvider.allocatePhysicalSlots(slotRequests);
        allocateResult.forEach((slotRequestId, resultCompletableFuture) -> {
            ExecutionSlotSharingGroup group = (ExecutionSlotSharingGroup)requestToGroup.get(slotRequestId);
            CompletionStage physicalSlotFuture = resultCompletableFuture.thenApply(PhysicalSlotRequest.Result::getPhysicalSlot);
            SharedSlot slot = new SharedSlot((SlotRequestId)slotRequestId, (ResourceProfile)requestToPhysicalResources.get(slotRequestId), group, (CompletableFuture<PhysicalSlot>)physicalSlotFuture, this.slotWillBeOccupiedIndefinitely, this::releaseSharedSlot);
            allocatedSlots.put(group, slot);
            Preconditions.checkState(!this.sharedSlots.containsKey(group));
            this.sharedSlots.put(group, slot);
        });
        return allocatedSlots;
    }

    private void releaseSharedSlot(ExecutionSlotSharingGroup executionSlotSharingGroup) {
        SharedSlot slot = this.sharedSlots.remove(executionSlotSharingGroup);
        Preconditions.checkNotNull(slot);
        Preconditions.checkState(slot.isEmpty(), "Trying to remove a shared slot with physical request id %s which has assigned logical slots", slot.getPhysicalSlotRequestId());
        this.slotProvider.cancelSlotRequest(slot.getPhysicalSlotRequestId(), new FlinkException("Slot is being returned from SlotSharingExecutionSlotAllocator."));
    }

    private ResourceProfile getPhysicalSlotResourceProfile(ExecutionSlotSharingGroup executionSlotSharingGroup) {
        if (!executionSlotSharingGroup.getResourceProfile().equals(ResourceProfile.UNKNOWN)) {
            return executionSlotSharingGroup.getResourceProfile();
        }
        return executionSlotSharingGroup.getExecutionVertexIds().stream().reduce(ResourceProfile.ZERO, (r, e) -> r.merge(this.resourceProfileRetriever.apply((ExecutionVertexID)e)), ResourceProfile::merge);
    }

    private SharingPhysicalSlotRequestBulk createBulk(Map<ExecutionSlotSharingGroup, SharedSlot> slots, Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> executions) {
        Map<ExecutionSlotSharingGroup, ResourceProfile> pendingRequests = executions.keySet().stream().collect(Collectors.toMap(group -> group, group -> ((SharedSlot)slots.get(group)).getPhysicalSlotResourceProfile()));
        SharingPhysicalSlotRequestBulk bulk = new SharingPhysicalSlotRequestBulk(executions, pendingRequests, this::cancelLogicalSlotRequest);
        SlotSharingExecutionSlotAllocator.registerPhysicalSlotRequestBulkCallbacks(slots, executions.keySet(), bulk);
        return bulk;
    }

    private static void registerPhysicalSlotRequestBulkCallbacks(Map<ExecutionSlotSharingGroup, SharedSlot> slots, Iterable<ExecutionSlotSharingGroup> executions, SharingPhysicalSlotRequestBulk bulk) {
        for (ExecutionSlotSharingGroup group : executions) {
            CompletableFuture<PhysicalSlot> slotContextFuture = slots.get(group).getSlotContextFuture();
            slotContextFuture.thenAccept(physicalSlot -> bulk.markFulfilled(group, physicalSlot.getAllocationId()));
            slotContextFuture.exceptionally(t -> {
                bulk.clearPendingRequests();
                return null;
            });
        }
    }

    private static class SlotExecutionVertexAssignment {
        private final ExecutionVertexID executionVertexId;
        private final CompletableFuture<LogicalSlot> logicalSlotFuture;

        SlotExecutionVertexAssignment(ExecutionVertexID executionVertexId, CompletableFuture<LogicalSlot> logicalSlotFuture) {
            this.executionVertexId = Preconditions.checkNotNull(executionVertexId);
            this.logicalSlotFuture = Preconditions.checkNotNull(logicalSlotFuture);
        }

        ExecutionVertexID getExecutionVertexId() {
            return this.executionVertexId;
        }

        CompletableFuture<LogicalSlot> getLogicalSlotFuture() {
            return this.logicalSlotFuture;
        }
    }
}

