/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.scheduler;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.lang.Validate;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.DefaultScheduler;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.IScheduler;
import org.apache.storm.scheduler.SchedulerAssignment;
import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IsolationScheduler
implements IScheduler {
    private static final Logger LOG = LoggerFactory.getLogger(IsolationScheduler.class);
    private Map<String, Number> isoMachines;

    @Override
    public void prepare(Map<String, Object> conf, StormMetricsRegistry metricsRegistry) {
        this.isoMachines = (Map)conf.get("isolation.scheduler.machines");
        Validate.notEmpty(this.isoMachines);
    }

    @Override
    public Map<String, Map<String, Double>> config() {
        return Collections.emptyMap();
    }

    @Override
    public void schedule(Topologies topologies, Cluster cluster) {
        List<TopologyDetails> isoTopologies = this.isolatedTopologies(topologies.getTopologies());
        Set<String> isoIds = this.extractTopologyIds(isoTopologies);
        Map<String, Set<Set<ExecutorDetails>>> topologyWorkerSpecs = this.topologyWorkerSpecs(isoTopologies);
        Map<String, Map<Integer, Integer>> topologyMachineDistributions = this.topologyMachineDistributions(isoTopologies);
        Map<String, List<AssignmentInfo>> hostAssignments = this.hostAssignments(cluster);
        for (Map.Entry<String, List<AssignmentInfo>> entry : hostAssignments.entrySet()) {
            AssignmentInfo ass;
            Iterator<Object> iterator;
            List<AssignmentInfo> assignments = entry.getValue();
            String string = assignments.get(0).getTopologyId();
            Map<Integer, Integer> distribution = topologyMachineDistributions.get(string);
            Set<Set<ExecutorDetails>> workerSpecs = topologyWorkerSpecs.get(string);
            int numWorkers = assignments.size();
            if (isoIds.contains(string) && this.checkAssignmentTopology(assignments, string) && distribution.containsKey(numWorkers) && this.checkAssignmentWorkerSpecs(assignments, workerSpecs)) {
                this.decrementDistribution(distribution, numWorkers);
                iterator = assignments.iterator();
                while (iterator.hasNext()) {
                    ass = (AssignmentInfo)iterator.next();
                    workerSpecs.remove(ass.getExecutors());
                }
                cluster.blacklistHost(entry.getKey());
                continue;
            }
            iterator = assignments.iterator();
            while (iterator.hasNext()) {
                ass = (AssignmentInfo)iterator.next();
                if (!isoIds.contains(ass.getTopologyId())) continue;
                cluster.freeSlot(ass.getWorkerSlot());
            }
        }
        Map<String, Set<WorkerSlot>> hostUsedSlots = this.hostToUsedSlots(cluster);
        LinkedList<HostAssignableSlots> hss = this.hostAssignableSlots(cluster);
        for (Map.Entry entry : topologyWorkerSpecs.entrySet()) {
            String topologyId = (String)entry.getKey();
            Set executorSet = (Set)entry.getValue();
            List<Integer> workerNum = this.distributionToSortedAmounts(topologyMachineDistributions.get(topologyId));
            for (Integer num : workerNum) {
                HostAssignableSlots hostSlots = hss.peek();
                List<WorkerSlot> slot = hostSlots != null ? hostSlots.getWorkerSlots() : null;
                if (slot == null || slot.size() < num) continue;
                hss.poll();
                cluster.freeSlots((Collection<WorkerSlot>)hostUsedSlots.get(hostSlots.getHostName()));
                for (WorkerSlot tmpSlot : slot.subList(0, num)) {
                    Set<ExecutorDetails> executor = this.removeElemFromExecutorsSet(executorSet);
                    cluster.assign(tmpSlot, topologyId, executor);
                }
                cluster.blacklistHost(hostSlots.getHostName());
            }
        }
        List<String> failedTopologyIds = this.extractFailedTopologyIds(topologyWorkerSpecs);
        if (failedTopologyIds.size() > 0) {
            LOG.warn("Unable to isolate topologies " + failedTopologyIds + ". No machine had enough worker slots to run the remaining workers for these topologies. Clearing all other resources and will wait for enough resources for isolated topologies before allocating any other resources.");
            Map<String, Set<WorkerSlot>> map = this.hostToUsedSlots(cluster);
            Set<Map.Entry<String, Set<WorkerSlot>>> entries = map.entrySet();
            for (Map.Entry<String, Set<WorkerSlot>> entry : entries) {
                if (cluster.isBlacklistedHost(entry.getKey())) continue;
                cluster.freeSlots((Collection<WorkerSlot>)entry.getValue());
            }
        } else {
            Set<String> set = this.allocatedTopologies(topologyWorkerSpecs);
            Topologies leftOverTopologies = this.leftoverTopologies(topologies, set);
            DefaultScheduler.defaultSchedule(leftOverTopologies, cluster);
        }
        Set<String> set = cluster.getBlacklistedHosts();
        cluster.setBlacklistedHosts(set);
    }

    private Set<ExecutorDetails> removeElemFromExecutorsSet(Set<Set<ExecutorDetails>> executorsSets) {
        Set<ExecutorDetails> elem = executorsSets.iterator().next();
        executorsSets.remove(elem);
        return elem;
    }

    private List<TopologyDetails> isolatedTopologies(Collection<TopologyDetails> topologies) {
        Set<String> topologyNames = this.isoMachines.keySet();
        ArrayList<TopologyDetails> isoTopologies = new ArrayList<TopologyDetails>();
        for (TopologyDetails topo : topologies) {
            if (!topologyNames.contains(topo.getName())) continue;
            isoTopologies.add(topo);
        }
        return isoTopologies;
    }

    private Set<String> extractTopologyIds(List<TopologyDetails> topologies) {
        HashSet<String> ids = new HashSet<String>();
        if (topologies != null && topologies.size() > 0) {
            for (TopologyDetails topology : topologies) {
                ids.add(topology.getId());
            }
        }
        return ids;
    }

    private List<String> extractFailedTopologyIds(Map<String, Set<Set<ExecutorDetails>>> isoTopologyWorkerSpecs) {
        ArrayList<String> failedTopologyIds = new ArrayList<String>();
        for (Map.Entry<String, Set<Set<ExecutorDetails>>> topoWorkerSpecsEntry : isoTopologyWorkerSpecs.entrySet()) {
            Set<Set<ExecutorDetails>> workerSpecs = topoWorkerSpecsEntry.getValue();
            if (workerSpecs == null || workerSpecs.isEmpty()) continue;
            failedTopologyIds.add(topoWorkerSpecsEntry.getKey());
        }
        return failedTopologyIds;
    }

    private Map<String, Set<Set<ExecutorDetails>>> topologyWorkerSpecs(List<TopologyDetails> topologies) {
        HashMap<String, Set<Set<ExecutorDetails>>> workerSpecs = new HashMap<String, Set<Set<ExecutorDetails>>>();
        for (TopologyDetails topology : topologies) {
            workerSpecs.put(topology.getId(), this.computeWorkerSpecs(topology));
        }
        return workerSpecs;
    }

    private Map<String, List<AssignmentInfo>> hostAssignments(Cluster cluster) {
        Collection<SchedulerAssignment> assignmentValues = cluster.getAssignments().values();
        HashMap<String, List<AssignmentInfo>> hostAssignments = new HashMap<String, List<AssignmentInfo>>();
        for (SchedulerAssignment sa : assignmentValues) {
            HashMap slotExecutors = Utils.reverseMap(sa.getExecutorToSlot());
            Set entries = slotExecutors.entrySet();
            for (Map.Entry entry : entries) {
                WorkerSlot slot = (WorkerSlot)entry.getKey();
                List executors = (List)entry.getValue();
                String host = cluster.getHost(slot.getNodeId());
                AssignmentInfo ass = new AssignmentInfo(slot, sa.getTopologyId(), new HashSet<ExecutorDetails>(executors));
                ArrayList<AssignmentInfo> executorList = (ArrayList<AssignmentInfo>)hostAssignments.get(host);
                if (executorList == null) {
                    executorList = new ArrayList<AssignmentInfo>();
                    hostAssignments.put(host, executorList);
                }
                executorList.add(ass);
            }
        }
        return hostAssignments;
    }

    private Set<Set<ExecutorDetails>> computeWorkerSpecs(TopologyDetails topology) {
        HashMap compExecutors = Utils.reverseMap(topology.getExecutorToComponent());
        ArrayList allExecutors = new ArrayList();
        Collection values = compExecutors.values();
        for (List value : values) {
            allExecutors.addAll(value);
        }
        int numWorkers = topology.getNumWorkers();
        int bucketIndex = 0;
        HashMap<Integer, HashSet<ExecutorDetails>> bucketExecutors = new HashMap<Integer, HashSet<ExecutorDetails>>(numWorkers);
        for (ExecutorDetails executor : allExecutors) {
            HashSet<ExecutorDetails> executors = (HashSet<ExecutorDetails>)bucketExecutors.get(bucketIndex);
            if (executors == null) {
                executors = new HashSet<ExecutorDetails>();
                bucketExecutors.put(bucketIndex, executors);
            }
            executors.add(executor);
            bucketIndex = (bucketIndex + 1) % numWorkers;
        }
        return new HashSet<Set<ExecutorDetails>>(bucketExecutors.values());
    }

    private Map<String, Map<Integer, Integer>> topologyMachineDistributions(List<TopologyDetails> isoTopologies) {
        HashMap<String, Map<Integer, Integer>> machineDistributions = new HashMap<String, Map<Integer, Integer>>();
        for (TopologyDetails topology : isoTopologies) {
            machineDistributions.put(topology.getId(), this.machineDistribution(topology));
        }
        return machineDistributions;
    }

    private Map<Integer, Integer> machineDistribution(TopologyDetails topology) {
        int machineNum = this.isoMachines.get(topology.getName()).intValue();
        int workerNum = topology.getNumWorkers();
        TreeMap distribution = Utils.integerDivided((int)workerNum, (int)machineNum);
        if (distribution.containsKey(0)) {
            distribution.remove(0);
        }
        return distribution;
    }

    private boolean checkAssignmentTopology(List<AssignmentInfo> assignments, String topologyId) {
        for (AssignmentInfo ass : assignments) {
            if (topologyId.equals(ass.getTopologyId())) continue;
            return false;
        }
        return true;
    }

    private boolean checkAssignmentWorkerSpecs(List<AssignmentInfo> assigments, Set<Set<ExecutorDetails>> workerSpecs) {
        for (AssignmentInfo ass : assigments) {
            if (workerSpecs.contains(ass.getExecutors())) continue;
            return false;
        }
        return true;
    }

    private void decrementDistribution(Map<Integer, Integer> distribution, int value) {
        Integer distValue = distribution.get(value);
        if (distValue != null) {
            int newValue = distValue - 1;
            if (newValue == 0) {
                distribution.remove(value);
            } else {
                distribution.put(value, newValue);
            }
        }
    }

    private Map<String, Set<WorkerSlot>> hostToUsedSlots(Cluster cluster) {
        Collection<WorkerSlot> usedSlots = cluster.getUsedSlots();
        HashMap<String, Set<WorkerSlot>> hostUsedSlots = new HashMap<String, Set<WorkerSlot>>();
        for (WorkerSlot slot : usedSlots) {
            String host = cluster.getHost(slot.getNodeId());
            HashSet<WorkerSlot> slots = (HashSet<WorkerSlot>)hostUsedSlots.get(host);
            if (slots == null) {
                slots = new HashSet<WorkerSlot>();
                hostUsedSlots.put(host, slots);
            }
            slots.add(slot);
        }
        return hostUsedSlots;
    }

    private LinkedList<HostAssignableSlots> hostAssignableSlots(Cluster cluster) {
        List<WorkerSlot> assignableSlots = cluster.getAssignableSlots();
        HashMap<String, ArrayList<WorkerSlot>> hostAssignableSlots = new HashMap<String, ArrayList<WorkerSlot>>();
        for (WorkerSlot slot : assignableSlots) {
            String host = cluster.getHost(slot.getNodeId());
            ArrayList<WorkerSlot> slots = (ArrayList<WorkerSlot>)hostAssignableSlots.get(host);
            if (slots == null) {
                slots = new ArrayList<WorkerSlot>();
                hostAssignableSlots.put(host, slots);
            }
            slots.add(slot);
        }
        ArrayList<HostAssignableSlots> sortHostAssignSlots = new ArrayList<HostAssignableSlots>();
        for (Map.Entry entry : hostAssignableSlots.entrySet()) {
            sortHostAssignSlots.add(new HostAssignableSlots((String)entry.getKey(), (List)entry.getValue()));
        }
        Collections.sort(sortHostAssignSlots, new Comparator<HostAssignableSlots>(){

            @Override
            public int compare(HostAssignableSlots o1, HostAssignableSlots o2) {
                return o2.getWorkerSlots().size() - o1.getWorkerSlots().size();
            }
        });
        Collections.shuffle(sortHostAssignSlots);
        return new LinkedList<HostAssignableSlots>(sortHostAssignSlots);
    }

    private List<Integer> distributionToSortedAmounts(Map<Integer, Integer> distributions) {
        ArrayList<Integer> sorts = new ArrayList<Integer>();
        for (Map.Entry<Integer, Integer> entry : distributions.entrySet()) {
            int workers = entry.getKey();
            int machines = entry.getValue();
            for (int i = 0; i < machines; ++i) {
                sorts.add(workers);
            }
        }
        Collections.sort(sorts, new Comparator<Integer>(){

            @Override
            public int compare(Integer o1, Integer o2) {
                return o2 - o1;
            }
        });
        return sorts;
    }

    private Set<String> allocatedTopologies(Map<String, Set<Set<ExecutorDetails>>> topologyToWorkerSpecs) {
        HashSet<String> allocatedTopologies = new HashSet<String>();
        Set<Map.Entry<String, Set<Set<ExecutorDetails>>>> entries = topologyToWorkerSpecs.entrySet();
        for (Map.Entry<String, Set<Set<ExecutorDetails>>> entry : entries) {
            if (!entry.getValue().isEmpty()) continue;
            allocatedTopologies.add(entry.getKey());
        }
        return allocatedTopologies;
    }

    private Topologies leftoverTopologies(Topologies topologies, Set<String> filterIds) {
        Collection<TopologyDetails> topos = topologies.getTopologies();
        HashMap<String, TopologyDetails> leftoverTopologies = new HashMap<String, TopologyDetails>();
        for (TopologyDetails topo : topos) {
            String id = topo.getId();
            if (filterIds.contains(id)) continue;
            leftoverTopologies.put(id, topo);
        }
        return new Topologies(leftoverTopologies);
    }

    class HostAssignableSlots {
        private String hostName;
        private List<WorkerSlot> workerSlots;

        HostAssignableSlots(String hostName, List<WorkerSlot> workerSlots) {
            this.hostName = hostName;
            this.workerSlots = workerSlots;
        }

        public String getHostName() {
            return this.hostName;
        }

        public List<WorkerSlot> getWorkerSlots() {
            return this.workerSlots;
        }
    }

    class AssignmentInfo {
        private WorkerSlot workerSlot;
        private String topologyId;
        private Set<ExecutorDetails> executors;

        AssignmentInfo(WorkerSlot workerSlot, String topologyId, Set<ExecutorDetails> executors) {
            this.workerSlot = workerSlot;
            this.topologyId = topologyId;
            this.executors = executors;
        }

        public WorkerSlot getWorkerSlot() {
            return this.workerSlot;
        }

        public String getTopologyId() {
            return this.topologyId;
        }

        public Set<ExecutorDetails> getExecutors() {
            return this.executors;
        }
    }
}

