package org.apache.storm.scheduler.resource.strategies.scheduling;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.TreeMap;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.resource.Component;
import org.apache.storm.scheduler.resource.RAS_Node;
import org.apache.storm.scheduler.resource.RAS_Nodes;
import org.apache.storm.scheduler.resource.SchedulingResult;
import org.apache.storm.scheduler.resource.SchedulingState;
import org.apache.storm.scheduler.resource.SchedulingStatus;
import org.apache.storm.shade.com.fasterxml.jackson.core.util.MinimalPrettyPrinter;
import org.apache.storm.shade.org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.class */
public class DefaultResourceAwareStrategy implements IStrategy {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultResourceAwareStrategy.class);
    private Cluster _cluster;
    private Topologies _topologies;
    private Map<String, List<String>> _clusterInfo;
    private RAS_Nodes _nodes;
    private RAS_Node refNode = null;
    private final double CPU_WEIGHT = 1.0d;
    private final double MEM_WEIGHT = 1.0d;
    private final double NETWORK_WEIGHT = 1.0d;

    @Override // org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy
    public void prepare(SchedulingState schedulingState) {
        this._cluster = schedulingState.cluster;
        this._topologies = schedulingState.topologies;
        this._nodes = schedulingState.nodes;
        this._clusterInfo = schedulingState.cluster.getNetworkTopography();
        LOG.debug(getClusterInfo());
    }

    private TreeMap<Integer, List<ExecutorDetails>> getPriorityToExecutorDetailsListMap(Queue<Component> queue, Collection<ExecutorDetails> collection) {
        TreeMap<Integer, List<ExecutorDetails>> treeMap = new TreeMap<>();
        Integer num = 0;
        for (Component component : queue) {
            treeMap.put(num, new ArrayList());
            for (ExecutorDetails executorDetails : component.execs) {
                if (collection.contains(executorDetails)) {
                    treeMap.get(num).add(executorDetails);
                }
            }
            num = Integer.valueOf(num.intValue() + 1);
        }
        return treeMap;
    }

    @Override // org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy
    public SchedulingResult schedule(TopologyDetails topologyDetails) {
        SchedulingResult successWithMsg;
        if (this._nodes.getNodes().size() <= 0) {
            LOG.warn("No available nodes to schedule tasks on!");
            return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "No available nodes to schedule tasks on!");
        }
        Collection<ExecutorDetails> unassignedExecutors = this._cluster.getUnassignedExecutors(topologyDetails);
        HashMap hashMap = new HashMap();
        LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors);
        ArrayList arrayList = new ArrayList();
        List<Component> spouts = getSpouts(topologyDetails);
        if (spouts.size() == 0) {
            LOG.error("Cannot find a Spout!");
            return SchedulingResult.failure(SchedulingStatus.FAIL_INVALID_TOPOLOGY, "Cannot find a Spout!");
        }
        TreeMap<Integer, List<ExecutorDetails>> priorityToExecutorDetailsListMap = getPriorityToExecutorDetailsListMap(bfs(topologyDetails, spouts), unassignedExecutors);
        HashSet hashSet = new HashSet(unassignedExecutors);
        Integer longestPriorityListSize = getLongestPriorityListSize(priorityToExecutorDetailsListMap);
        for (int i = 0; i < longestPriorityListSize.intValue(); i++) {
            for (Map.Entry<Integer, List<ExecutorDetails>> entry : priorityToExecutorDetailsListMap.entrySet()) {
                Iterator<ExecutorDetails> it = entry.getValue().iterator();
                if (it.hasNext()) {
                    ExecutorDetails next = it.next();
                    LOG.debug("\n\nAttempting to schedule: {} of component {}[avail {}] with rank {}", new Object[]{next, topologyDetails.getExecutorToComponent().get(next), topologyDetails.getTaskResourceReqList(next), entry.getKey()});
                    scheduleExecutor(next, topologyDetails, hashMap, arrayList);
                    it.remove();
                }
            }
        }
        hashSet.removeAll(arrayList);
        LOG.debug("/* Scheduling left over task (most likely sys tasks) */");
        Iterator it2 = hashSet.iterator();
        while (it2.hasNext()) {
            scheduleExecutor((ExecutorDetails) it2.next(), topologyDetails, hashMap, arrayList);
        }
        hashSet.removeAll(arrayList);
        if (hashSet.size() > 0) {
            LOG.error("Not all executors successfully scheduled: {}", hashSet);
            hashMap = null;
            successWithMsg = SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, (topologyDetails.getExecutors().size() - unassignedExecutors.size()) + "/" + topologyDetails.getExecutors().size() + " executors scheduled");
        } else {
            LOG.debug("All resources successfully scheduled!");
            successWithMsg = SchedulingResult.successWithMsg(hashMap, "Fully Scheduled by DefaultResourceAwareStrategy");
        }
        if (hashMap == null) {
            LOG.error("Topology {} not successfully scheduled!", topologyDetails.getId());
        }
        return successWithMsg;
    }

    private void scheduleExecutor(ExecutorDetails executorDetails, TopologyDetails topologyDetails, Map<WorkerSlot, Collection<ExecutorDetails>> map, Collection<ExecutorDetails> collection) {
        WorkerSlot findWorkerForExec = findWorkerForExec(executorDetails, topologyDetails, map);
        if (findWorkerForExec == null) {
            LOG.error("Not Enough Resources to schedule Task {}", executorDetails);
            return;
        }
        RAS_Node idToNode = idToNode(findWorkerForExec.getNodeId());
        if (!map.containsKey(findWorkerForExec)) {
            map.put(findWorkerForExec, new LinkedList());
        }
        map.get(findWorkerForExec).add(executorDetails);
        idToNode.consumeResourcesforTask(executorDetails, topologyDetails);
        collection.add(executorDetails);
        LOG.debug("TASK {} assigned to Node: {} avail [mem: {} cpu: {}] total [mem: {} cpu: {}] on slot: {}", new Object[]{executorDetails, idToNode, idToNode.getAvailableMemoryResources(), idToNode.getAvailableCpuResources(), idToNode.getTotalMemoryResources(), idToNode.getTotalCpuResources(), findWorkerForExec});
    }

    private WorkerSlot findWorkerForExec(ExecutorDetails executorDetails, TopologyDetails topologyDetails, Map<WorkerSlot, Collection<ExecutorDetails>> map) {
        WorkerSlot bestWorker = this.refNode == null ? getBestWorker(executorDetails, topologyDetails, getBestClustering(), map) : getBestWorker(executorDetails, topologyDetails, map);
        if (bestWorker != null) {
            this.refNode = idToNode(bestWorker.getNodeId());
        }
        LOG.debug("reference node for the resource aware scheduler is: {}", this.refNode);
        return bestWorker;
    }

    private WorkerSlot getBestWorker(ExecutorDetails executorDetails, TopologyDetails topologyDetails, Map<WorkerSlot, Collection<ExecutorDetails>> map) {
        return getBestWorker(executorDetails, topologyDetails, null, map);
    }

    private WorkerSlot getBestWorker(ExecutorDetails executorDetails, TopologyDetails topologyDetails, String str, Map<WorkerSlot, Collection<ExecutorDetails>> map) {
        double doubleValue = topologyDetails.getTotalMemReqTask(executorDetails).doubleValue();
        double doubleValue2 = topologyDetails.getTotalCpuReqTask(executorDetails).doubleValue();
        List<RAS_Node> availableNodesFromCluster = str != null ? getAvailableNodesFromCluster(str) : getAvailableNodes();
        TreeMap treeMap = new TreeMap();
        for (RAS_Node rAS_Node : availableNodesFromCluster) {
            if (rAS_Node.getFreeSlots().size() > 0 && rAS_Node.getAvailableMemoryResources().doubleValue() >= doubleValue && rAS_Node.getAvailableCpuResources().doubleValue() >= doubleValue2) {
                double doubleValue3 = (doubleValue2 - rAS_Node.getAvailableCpuResources().doubleValue()) / (rAS_Node.getAvailableCpuResources().doubleValue() + 1.0d);
                getClass();
                double pow = Math.pow(doubleValue3 * 1.0d, 2.0d);
                double doubleValue4 = (doubleValue - rAS_Node.getAvailableMemoryResources().doubleValue()) / (rAS_Node.getAvailableMemoryResources().doubleValue() + 1.0d);
                getClass();
                double pow2 = Math.pow(doubleValue4 * 1.0d, 2.0d);
                double d = 0.0d;
                if (this.refNode != null) {
                    double doubleValue5 = distToNode(this.refNode, rAS_Node).doubleValue();
                    getClass();
                    d = Math.pow(doubleValue5 * 1.0d, 2.0d);
                }
                treeMap.put(Double.valueOf(Math.sqrt(pow + pow2 + d)), rAS_Node);
            }
        }
        Iterator it = treeMap.entrySet().iterator();
        while (it.hasNext()) {
            for (WorkerSlot workerSlot : ((RAS_Node) ((Map.Entry) it.next()).getValue()).getFreeSlots()) {
                if (checkWorkerConstraints(executorDetails, workerSlot, topologyDetails, map)) {
                    return workerSlot;
                }
            }
        }
        return null;
    }

    private String getBestClustering() {
        String str = null;
        Double valueOf = Double.valueOf(0.0d);
        for (Map.Entry<String, List<String>> entry : this._clusterInfo.entrySet()) {
            Double totalClusterRes = getTotalClusterRes(entry.getValue());
            if (totalClusterRes.doubleValue() > valueOf.doubleValue()) {
                valueOf = totalClusterRes;
                str = entry.getKey();
            }
        }
        return str;
    }

    private Double getTotalClusterRes(List<String> list) {
        Double valueOf = Double.valueOf(0.0d);
        for (String str : list) {
            valueOf = Double.valueOf(valueOf.doubleValue() + this._nodes.getNodeById(NodeHostnameToId(str)).getAvailableMemoryResources().doubleValue() + this._nodes.getNodeById(NodeHostnameToId(str)).getAvailableCpuResources().doubleValue());
        }
        return valueOf;
    }

    private Double distToNode(RAS_Node rAS_Node, RAS_Node rAS_Node2) {
        return rAS_Node.getId().equals(rAS_Node2.getId()) ? Double.valueOf(0.0d) : NodeToCluster(rAS_Node).equals(NodeToCluster(rAS_Node2)) ? Double.valueOf(0.5d) : Double.valueOf(1.0d);
    }

    private String NodeToCluster(RAS_Node rAS_Node) {
        for (Map.Entry<String, List<String>> entry : this._clusterInfo.entrySet()) {
            if (entry.getValue().contains(rAS_Node.getHostname())) {
                return entry.getKey();
            }
        }
        LOG.error("Node: {} not found in any clusters", rAS_Node.getHostname());
        return null;
    }

    private List<RAS_Node> getAvailableNodes() {
        LinkedList linkedList = new LinkedList();
        Iterator<String> it = this._clusterInfo.keySet().iterator();
        while (it.hasNext()) {
            linkedList.addAll(getAvailableNodesFromCluster(it.next()));
        }
        return linkedList;
    }

    private List<RAS_Node> getAvailableNodesFromCluster(String str) {
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = this._clusterInfo.get(str).iterator();
        while (it.hasNext()) {
            arrayList.add(this._nodes.getNodeById(NodeHostnameToId(it.next())));
        }
        return arrayList;
    }

    private List<WorkerSlot> getAvailableWorkersFromCluster(String str) {
        List<RAS_Node> availableNodesFromCluster = getAvailableNodesFromCluster(str);
        LinkedList linkedList = new LinkedList();
        Iterator<RAS_Node> it = availableNodesFromCluster.iterator();
        while (it.hasNext()) {
            linkedList.addAll(it.next().getFreeSlots());
        }
        return linkedList;
    }

    private List<WorkerSlot> getAvailableWorker() {
        LinkedList linkedList = new LinkedList();
        Iterator<String> it = this._clusterInfo.keySet().iterator();
        while (it.hasNext()) {
            linkedList.addAll(getAvailableWorkersFromCluster(it.next()));
        }
        return linkedList;
    }

    private Queue<Component> bfs(TopologyDetails topologyDetails, List<Component> list) {
        LinkedList linkedList = new LinkedList();
        HashSet hashSet = new HashSet();
        for (Component component : list) {
            if (!hashSet.contains(component.id)) {
                LinkedList linkedList2 = new LinkedList();
                hashSet.add(component.id);
                linkedList2.offer(component);
                while (!linkedList2.isEmpty()) {
                    Component component2 = (Component) linkedList2.poll();
                    linkedList.add(component2);
                    ArrayList<String> arrayList = new ArrayList();
                    arrayList.addAll(component2.children);
                    arrayList.addAll(component2.parents);
                    for (String str : arrayList) {
                        if (!hashSet.contains(str)) {
                            Component component3 = topologyDetails.getComponents().get(str);
                            hashSet.add(str);
                            linkedList2.offer(component3);
                        }
                    }
                }
            }
        }
        return linkedList;
    }

    private List<Component> getSpouts(TopologyDetails topologyDetails) {
        ArrayList arrayList = new ArrayList();
        for (Component component : topologyDetails.getComponents().values()) {
            if (component.type == Component.ComponentType.SPOUT) {
                arrayList.add(component);
            }
        }
        return arrayList;
    }

    private Integer getLongestPriorityListSize(Map<Integer, List<ExecutorDetails>> map) {
        Integer num = 0;
        Iterator<List<ExecutorDetails>> it = map.values().iterator();
        while (it.hasNext()) {
            Integer valueOf = Integer.valueOf(it.next().size());
            if (num.intValue() < valueOf.intValue()) {
                num = valueOf;
            }
        }
        return num;
    }

    private Double getWorkerScheduledMemoryAvailable(WorkerSlot workerSlot, TopologyDetails topologyDetails, Map<WorkerSlot, Collection<ExecutorDetails>> map) {
        return Double.valueOf(topologyDetails.getTopologyWorkerMaxHeapSize().doubleValue() - getWorkerScheduledMemoryUse(workerSlot, topologyDetails, map).doubleValue());
    }

    private Double getWorkerScheduledMemoryUse(WorkerSlot workerSlot, TopologyDetails topologyDetails, Map<WorkerSlot, Collection<ExecutorDetails>> map) {
        Double valueOf = Double.valueOf(0.0d);
        Collection<ExecutorDetails> collection = map.get(workerSlot);
        if (collection != null) {
            Iterator<ExecutorDetails> it = collection.iterator();
            while (it.hasNext()) {
                valueOf = Double.valueOf(valueOf.doubleValue() + topologyDetails.getTotalMemReqTask(it.next()).doubleValue());
            }
        }
        return valueOf;
    }

    private boolean checkWorkerConstraints(ExecutorDetails executorDetails, WorkerSlot workerSlot, TopologyDetails topologyDetails, Map<WorkerSlot, Collection<ExecutorDetails>> map) {
        boolean z = false;
        if (getWorkerScheduledMemoryAvailable(workerSlot, topologyDetails, map).doubleValue() >= topologyDetails.getTotalMemReqTask(executorDetails).doubleValue()) {
            z = true;
        }
        return z;
    }

    private String getClusterInfo() {
        String str = "Cluster info:\n";
        for (Map.Entry<String, List<String>> entry : this._clusterInfo.entrySet()) {
            str = str + "Rack: " + entry.getKey() + IOUtils.LINE_SEPARATOR_UNIX;
            Iterator<String> it = entry.getValue().iterator();
            while (it.hasNext()) {
                RAS_Node idToNode = idToNode(NodeHostnameToId(it.next()));
                str = ((str + "-> Node: " + idToNode.getHostname() + MinimalPrettyPrinter.DEFAULT_ROOT_VALUE_SEPARATOR + idToNode.getId() + IOUtils.LINE_SEPARATOR_UNIX) + "--> Avail Resources: {Mem " + idToNode.getAvailableMemoryResources() + ", CPU " + idToNode.getAvailableCpuResources() + "}\n") + "--> Total Resources: {Mem " + idToNode.getTotalMemoryResources() + ", CPU " + idToNode.getTotalCpuResources() + "}\n";
            }
        }
        return str;
    }

    public String NodeHostnameToId(String str) {
        for (RAS_Node rAS_Node : this._nodes.getNodes()) {
            if (rAS_Node.getHostname() != null && rAS_Node.getHostname().equals(str)) {
                return rAS_Node.getId();
            }
        }
        LOG.error("Cannot find Node with hostname {}", str);
        return null;
    }

    public RAS_Node idToNode(String str) {
        RAS_Node nodeById = this._nodes.getNodeById(str);
        if (nodeById == null) {
            LOG.error("Cannot find Node with Id: {}", str);
        }
        return nodeById;
    }
}
