/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.scheduler.ExecutionSlotSharingGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskBalancedExecutionSlotSharingGroupBuilder {
    public static final Logger LOG = LoggerFactory.getLogger(TaskBalancedExecutionSlotSharingGroupBuilder.class);
    private final Map<JobVertexID, List<ExecutionVertexID>> allVertices;
    private final Map<JobVertexID, SlotSharingGroup> slotSharingGroupMap;
    private final Map<SlotSharingGroup, List<ExecutionSlotSharingGroup>> paralleledExecutionSlotSharingGroupsMap;
    private final Map<SlotSharingGroup, Integer> slotSharingGroupIndexMap;
    private final Map<ExecutionVertexID, ExecutionSlotSharingGroup> executionSlotSharingGroupMap;
    private final Map<JobVertexID, CoLocationGroup> coLocationGroupMap;
    private final Map<CoLocationConstraint, ExecutionSlotSharingGroup> constraintToExecutionSlotSharingGroupMap;

    public TaskBalancedExecutionSlotSharingGroupBuilder(Map<JobVertexID, List<ExecutionVertexID>> allVertices, Collection<SlotSharingGroup> slotSharingGroups, Collection<CoLocationGroup> coLocationGroups) {
        this.allVertices = Preconditions.checkNotNull(allVertices);
        this.coLocationGroupMap = new HashMap<JobVertexID, CoLocationGroup>();
        for (CoLocationGroup coLocationGroup : coLocationGroups) {
            for (JobVertexID jobVertexId : coLocationGroup.getVertexIds()) {
                this.coLocationGroupMap.put(jobVertexId, coLocationGroup);
            }
        }
        this.constraintToExecutionSlotSharingGroupMap = new HashMap<CoLocationConstraint, ExecutionSlotSharingGroup>();
        this.paralleledExecutionSlotSharingGroupsMap = new HashMap<SlotSharingGroup, List<ExecutionSlotSharingGroup>>(slotSharingGroups.size());
        this.slotSharingGroupIndexMap = new HashMap<SlotSharingGroup, Integer>(slotSharingGroups.size());
        this.slotSharingGroupMap = new HashMap<JobVertexID, SlotSharingGroup>();
        this.executionSlotSharingGroupMap = new HashMap<ExecutionVertexID, ExecutionSlotSharingGroup>();
        for (SlotSharingGroup slotSharingGroup : slotSharingGroups) {
            for (JobVertexID jobVertexId : slotSharingGroup.getJobVertexIds()) {
                this.slotSharingGroupMap.put(jobVertexId, slotSharingGroup);
            }
        }
    }

    public Map<ExecutionVertexID, ExecutionSlotSharingGroup> build() {
        this.initParalleledExecutionSlotSharingGroupsMap(this.allVertices);
        for (Map.Entry<JobVertexID, List<ExecutionVertexID>> executionVertexInfos : this.allVertices.entrySet()) {
            JobVertexID jobVertexID = executionVertexInfos.getKey();
            List<ExecutionVertexID> executionVertexIds = executionVertexInfos.getValue();
            SlotSharingGroup slotSharingGroup = this.slotSharingGroupMap.get(jobVertexID);
            if (!this.coLocationGroupMap.containsKey(jobVertexID)) {
                this.allocateNonCoLocatedVertices(slotSharingGroup, executionVertexIds);
                continue;
            }
            this.allocateCoLocatedVertices(slotSharingGroup, executionVertexIds);
        }
        return this.executionSlotSharingGroupMap;
    }

    private void initParalleledExecutionSlotSharingGroupsMap(Map<JobVertexID, List<ExecutionVertexID>> allVertices) {
        allVertices.entrySet().stream().map(jobVertexExecutionVertices -> Tuple2.of(this.slotSharingGroupMap.get(jobVertexExecutionVertices.getKey()), ((List)jobVertexExecutionVertices.getValue()).size())).collect(Collectors.groupingBy(tuple -> (SlotSharingGroup)tuple.f0, Collectors.summarizingInt(tuple -> (Integer)tuple.f1))).forEach((slotSharingGroup, statistics) -> {
            int slotNum = statistics.getMax();
            this.paralleledExecutionSlotSharingGroupsMap.put((SlotSharingGroup)slotSharingGroup, this.createExecutionSlotSharingGroups((SlotSharingGroup)slotSharingGroup, slotNum));
        });
    }

    private List<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups(SlotSharingGroup slotSharingGroup, int slotNum) {
        ArrayList<ExecutionSlotSharingGroup> executionSlotSharingGroups = new ArrayList<ExecutionSlotSharingGroup>(slotNum);
        for (int i = 0; i < slotNum; ++i) {
            ExecutionSlotSharingGroup executionSlotSharingGroup = new ExecutionSlotSharingGroup(slotSharingGroup);
            executionSlotSharingGroups.add(i, executionSlotSharingGroup);
            LOG.debug("Create {}-th(st/nd) executionSlotSharingGroup {}.", (Object)i, (Object)executionSlotSharingGroup);
        }
        return executionSlotSharingGroups;
    }

    private void allocateCoLocatedVertices(SlotSharingGroup slotSharingGroup, List<ExecutionVertexID> executionVertexIds) {
        List<ExecutionSlotSharingGroup> executionSlotSharingGroups = this.paralleledExecutionSlotSharingGroupsMap.get(slotSharingGroup);
        for (ExecutionVertexID executionVertexId : executionVertexIds) {
            CoLocationConstraint coLocationConstraint = this.getCoLocationConstraint(executionVertexId);
            ExecutionSlotSharingGroup executionSlotSharingGroup = this.constraintToExecutionSlotSharingGroupMap.get(coLocationConstraint);
            if (Objects.isNull(executionSlotSharingGroup)) {
                executionSlotSharingGroup = executionSlotSharingGroups.get(this.getLeastUtilizeSlotIndex(executionSlotSharingGroups, executionVertexId));
                this.constraintToExecutionSlotSharingGroupMap.put(coLocationConstraint, executionSlotSharingGroup);
            }
            this.addVertexToExecutionSlotSharingGroup(executionSlotSharingGroup, executionVertexId);
        }
        int jobVertexParallel = executionVertexIds.size();
        if (!this.isMaxParallelism(jobVertexParallel, slotSharingGroup)) {
            int index = this.getLeastUtilizeSlotIndex(executionSlotSharingGroups, null);
            this.updateSlotRoundRobinIndexIfNeeded(jobVertexParallel, slotSharingGroup, index);
        }
    }

    private void allocateNonCoLocatedVertices(SlotSharingGroup slotSharingGroup, List<ExecutionVertexID> executionVertices) {
        int jobVertexParallel = executionVertices.size();
        int index = this.getSlotRoundRobinIndex(jobVertexParallel, slotSharingGroup);
        List<ExecutionSlotSharingGroup> executionSlotSharingGroups = this.paralleledExecutionSlotSharingGroupsMap.get(slotSharingGroup);
        for (ExecutionVertexID executionVertexId : executionVertices) {
            this.addVertexToExecutionSlotSharingGroup(executionSlotSharingGroups.get(index), executionVertexId);
            ++index;
            index %= executionSlotSharingGroups.size();
        }
        this.updateSlotRoundRobinIndexIfNeeded(executionVertices.size(), slotSharingGroup, index);
    }

    private void addVertexToExecutionSlotSharingGroup(ExecutionSlotSharingGroup executionSlotSharingGroup, ExecutionVertexID executionVertexId) {
        executionSlotSharingGroup.addVertex(executionVertexId);
        this.executionSlotSharingGroupMap.put(executionVertexId, executionSlotSharingGroup);
    }

    private CoLocationConstraint getCoLocationConstraint(ExecutionVertexID executionVertexId) {
        JobVertexID jobVertexID = executionVertexId.getJobVertexId();
        int subtaskIndex = executionVertexId.getSubtaskIndex();
        return this.coLocationGroupMap.get(jobVertexID).getLocationConstraint(subtaskIndex);
    }

    private int getSlotRoundRobinIndex(int jobVertexParallelism, SlotSharingGroup slotSharingGroup) {
        boolean maxParallel = this.isMaxParallelism(jobVertexParallelism, slotSharingGroup);
        return maxParallel ? 0 : this.slotSharingGroupIndexMap.getOrDefault(slotSharingGroup, 0);
    }

    private void updateSlotRoundRobinIndexIfNeeded(int jobVertexParallelism, SlotSharingGroup slotSharingGroup, int nextIndex) {
        if (!this.isMaxParallelism(jobVertexParallelism, slotSharingGroup)) {
            this.slotSharingGroupIndexMap.put(slotSharingGroup, nextIndex);
        }
    }

    private boolean isMaxParallelism(int jobVertexParallelism, SlotSharingGroup slotSharingGroup) {
        List<ExecutionSlotSharingGroup> executionSlotSharingGroups = this.paralleledExecutionSlotSharingGroupsMap.get(slotSharingGroup);
        return jobVertexParallelism == executionSlotSharingGroups.size();
    }

    private int getLeastUtilizeSlotIndex(List<ExecutionSlotSharingGroup> executionSlotSharingGroups, @Nullable ExecutionVertexID executionVertexId) {
        int indexWithLeastExecutionVertices = 0;
        int leastExecutionVertices = Integer.MAX_VALUE;
        for (int index = 0; index < executionSlotSharingGroups.size(); ++index) {
            ExecutionSlotSharingGroup executionSlotSharingGroup = executionSlotSharingGroups.get(index);
            int executionVertices = executionSlotSharingGroup.getExecutionVertexIds().size();
            if (leastExecutionVertices <= executionVertices || !Objects.isNull(executionVertexId) && !this.allocatable(executionSlotSharingGroup, executionVertexId)) continue;
            indexWithLeastExecutionVertices = index;
            leastExecutionVertices = executionVertices;
        }
        return indexWithLeastExecutionVertices;
    }

    private boolean allocatable(ExecutionSlotSharingGroup executionSlotSharingGroup, @Nonnull ExecutionVertexID executionVertexId) {
        JobVertexID jobVertexId = executionVertexId.getJobVertexId();
        Set allocatedJobVertices = executionSlotSharingGroup.getExecutionVertexIds().stream().map(ExecutionVertexID::getJobVertexId).collect(Collectors.toSet());
        return !allocatedJobVertices.contains(jobVertexId);
    }
}

