/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.model;

import com.linkedin.cruisecontrol.monitor.sampling.aggregator.AggregatedMetricValues;
import com.linkedin.kafka.cruisecontrol.analyzer.AnalyzerUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingConstraint;
import com.linkedin.kafka.cruisecontrol.common.Resource;
import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityInfo;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.ClusterModelStats;
import com.linkedin.kafka.cruisecontrol.model.Disk;
import com.linkedin.kafka.cruisecontrol.model.Host;
import com.linkedin.kafka.cruisecontrol.model.Load;
import com.linkedin.kafka.cruisecontrol.model.Partition;
import com.linkedin.kafka.cruisecontrol.model.Rack;
import com.linkedin.kafka.cruisecontrol.model.RawAndDerivedResource;
import com.linkedin.kafka.cruisecontrol.model.Replica;
import com.linkedin.kafka.cruisecontrol.model.ReplicaPlacementInfo;
import com.linkedin.kafka.cruisecontrol.monitor.ModelGeneration;
import com.linkedin.kafka.cruisecontrol.monitor.MonitorUtils;
import com.linkedin.kafka.cruisecontrol.servlet.response.stats.BrokerStats;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.math3.stat.descriptive.moment.Variance;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

public class ClusterModel
implements Serializable {
    private static final long serialVersionUID = -6840253566423285966L;
    private static final Broker GENESIS_BROKER = new Broker(null, -1, new BrokerCapacityInfo(MonitorUtils.EMPTY_BROKER_CAPACITY), false);
    private final ModelGeneration _generation;
    private final Map<String, Rack> _racksById;
    private final Map<Integer, Rack> _brokerIdToRack;
    private final Map<TopicPartition, Partition> _partitionsByTopicPartition;
    private final Set<Replica> _selfHealingEligibleReplicas;
    private final SortedSet<Broker> _newBrokers;
    private final SortedSet<Broker> _brokersWithBadDisks;
    private final Set<Broker> _aliveBrokers;
    private final SortedSet<Broker> _deadBrokers;
    private final SortedSet<Broker> _brokers;
    private final double _monitoredPartitionsRatio;
    private final double[] _clusterCapacity;
    private final Load _load;
    private int _maxReplicationFactor;
    private final Map<String, Integer> _replicationFactorByTopic;
    private final Map<Integer, Load> _potentialLeadershipLoadByBrokerId;
    private int _unknownHostId;
    private final Map<Integer, String> _capacityEstimationInfoByBrokerId;

    public ClusterModel(ModelGeneration generation, double monitoredPartitionsRatio) {
        this._generation = generation;
        this._racksById = new HashMap<String, Rack>();
        this._brokerIdToRack = new HashMap<Integer, Rack>();
        this._partitionsByTopicPartition = new HashMap<TopicPartition, Partition>();
        this._selfHealingEligibleReplicas = new HashSet<Replica>();
        this._newBrokers = new TreeSet<Broker>();
        this._brokersWithBadDisks = new TreeSet<Broker>();
        this._aliveBrokers = new HashSet<Broker>();
        this._brokers = new TreeSet<Broker>();
        this._deadBrokers = new TreeSet<Broker>();
        this._load = new Load();
        this._clusterCapacity = new double[Resource.cachedValues().size()];
        this._maxReplicationFactor = 1;
        this._replicationFactorByTopic = new HashMap<String, Integer>();
        this._potentialLeadershipLoadByBrokerId = new HashMap<Integer, Load>();
        this._monitoredPartitionsRatio = monitoredPartitionsRatio;
        this._unknownHostId = 0;
        this._capacityEstimationInfoByBrokerId = new HashMap<Integer, String>();
    }

    public ModelGeneration generation() {
        return this._generation;
    }

    public double monitoredPartitionsRatio() {
        return this._monitoredPartitionsRatio;
    }

    public ClusterModelStats getClusterStats(BalancingConstraint balancingConstraint) {
        return new ClusterModelStats().populate(this, balancingConstraint);
    }

    public Rack rack(String rackId) {
        return this._racksById.get(rackId);
    }

    public Map<TopicPartition, List<ReplicaPlacementInfo>> getReplicaDistribution() {
        HashMap<TopicPartition, List<ReplicaPlacementInfo>> replicaDistribution = new HashMap<TopicPartition, List<ReplicaPlacementInfo>>(this._partitionsByTopicPartition.size());
        for (Map.Entry<TopicPartition, Partition> entry : this._partitionsByTopicPartition.entrySet()) {
            TopicPartition tp = entry.getKey();
            Partition partition = entry.getValue();
            List replicaPlacementInfos = partition.replicas().stream().map(r -> r.disk() == null ? new ReplicaPlacementInfo(r.broker().id()) : new ReplicaPlacementInfo(r.broker().id(), r.disk().logDir())).collect(Collectors.toList());
            replicaDistribution.put(tp, replicaPlacementInfos);
        }
        return replicaDistribution;
    }

    public Map<TopicPartition, ReplicaPlacementInfo> getLeaderDistribution() {
        HashMap<TopicPartition, ReplicaPlacementInfo> leaders = new HashMap<TopicPartition, ReplicaPlacementInfo>(this._partitionsByTopicPartition.size());
        for (Map.Entry<TopicPartition, Partition> entry : this._partitionsByTopicPartition.entrySet()) {
            Replica leaderReplica = entry.getValue().leader();
            if (leaderReplica.disk() == null) {
                leaders.put(entry.getKey(), new ReplicaPlacementInfo(leaderReplica.broker().id()));
                continue;
            }
            leaders.put(entry.getKey(), new ReplicaPlacementInfo(leaderReplica.broker().id(), leaderReplica.disk().logDir()));
        }
        return leaders;
    }

    public Set<Replica> selfHealingEligibleReplicas() {
        return this._selfHealingEligibleReplicas;
    }

    public Load load() {
        return this._load;
    }

    public Load potentialLeadershipLoadFor(Integer brokerId) {
        return this._potentialLeadershipLoadByBrokerId.get(brokerId);
    }

    public int maxReplicationFactor() {
        return this._maxReplicationFactor;
    }

    public Map<String, Integer> replicationFactorByTopic() {
        return this._replicationFactorByTopic;
    }

    public Partition partition(TopicPartition tp) {
        return this._partitionsByTopicPartition.get(tp);
    }

    public SortedMap<String, List<Partition>> getPartitionsByTopic() {
        TreeMap<String, List<Partition>> partitionsByTopic = new TreeMap<String, List<Partition>>();
        for (String string : this.topics()) {
            partitionsByTopic.put(string, new ArrayList());
        }
        for (Map.Entry entry : this._partitionsByTopicPartition.entrySet()) {
            ((List)partitionsByTopic.get(((TopicPartition)entry.getKey()).topic())).add((Partition)entry.getValue());
        }
        return partitionsByTopic;
    }

    public Set<Replica> leaderReplicas() {
        return this._partitionsByTopicPartition.values().stream().map(Partition::leader).collect(Collectors.toSet());
    }

    public void setBrokerState(int brokerId, Broker.State newState) {
        Broker broker = this.broker(brokerId);
        if (broker == null) {
            throw new IllegalArgumentException("Broker " + brokerId + " does not exist.");
        }
        broker.rack().setBrokerState(brokerId, newState);
        this._selfHealingEligibleReplicas.addAll(broker.currentOfflineReplicas());
        this.refreshCapacity();
        switch (newState) {
            case DEAD: {
                this._aliveBrokers.remove(broker);
                this._deadBrokers.add(broker);
                this._brokersWithBadDisks.remove(broker);
                break;
            }
            case NEW: {
                this._newBrokers.add(broker);
            }
            case DEMOTED: 
            case ALIVE: {
                this._aliveBrokers.add(broker);
                this._deadBrokers.remove(broker);
                this._brokersWithBadDisks.remove(broker);
                break;
            }
            case BAD_DISKS: {
                this._aliveBrokers.add(broker);
                this._deadBrokers.remove(broker);
                this._brokersWithBadDisks.add(broker);
                for (Replica replica : broker.currentOfflineReplicas()) {
                    this._partitionsByTopicPartition.get(replica.topicPartition()).addIneligibleBroker(broker);
                }
                break;
            }
            default: {
                throw new IllegalArgumentException("Illegal broker state " + newState + " is provided.");
            }
        }
    }

    void markDiskDead(int brokerId, String logdir) {
        Broker broker = this.broker(brokerId);
        if (broker == null) {
            throw new IllegalArgumentException("Broker " + brokerId + " does not exist.");
        }
        broker.rack().markDiskDead(brokerId, logdir);
        this._selfHealingEligibleReplicas.addAll(broker.currentOfflineReplicas());
        this.refreshCapacity();
    }

    public void relocateReplica(TopicPartition tp, int brokerId, String destinationLogdir) {
        Replica replicaToMove = this._partitionsByTopicPartition.get(tp).replica(brokerId);
        replicaToMove.broker().moveReplicaBetweenDisks(tp, replicaToMove.disk().logDir(), destinationLogdir);
    }

    public void relocateReplica(TopicPartition tp, int sourceBrokerId, int destinationBrokerId) {
        Replica replica = this.removeReplica(sourceBrokerId, tp);
        if (replica == null) {
            throw new IllegalArgumentException("Replica is not in the cluster.");
        }
        replica.setBroker(this.broker(destinationBrokerId));
        replica.broker().rack().addReplica(replica);
        this._load.addLoad(replica.load());
        this._potentialLeadershipLoadByBrokerId.get(destinationBrokerId).addLoad(this.partition(tp).leader().load());
    }

    public boolean relocateLeadership(TopicPartition tp, int sourceBrokerId, int destinationBrokerId) {
        Replica sourceReplica = this._partitionsByTopicPartition.get(tp).replica(sourceBrokerId);
        if (!sourceReplica.isLeader()) {
            return false;
        }
        Replica destinationReplica = this._partitionsByTopicPartition.get(tp).replica(destinationBrokerId);
        if (destinationReplica.isLeader()) {
            throw new IllegalArgumentException("Cannot relocate leadership of partition " + tp + "from broker " + sourceBrokerId + " to broker " + destinationBrokerId + " because the destination replica is a leader.");
        }
        Rack rack = this.broker(sourceBrokerId).rack();
        AggregatedMetricValues leadershipLoadDelta = rack.makeFollower(sourceBrokerId, tp);
        rack = this.broker(destinationBrokerId).rack();
        rack.makeLeader(destinationBrokerId, tp, leadershipLoadDelta);
        Partition partition = this._partitionsByTopicPartition.get(tp);
        partition.relocateLeadership(destinationReplica);
        return true;
    }

    public Set<Broker> aliveBrokers() {
        return this._aliveBrokers;
    }

    public SortedSet<Broker> deadBrokers() {
        return new TreeSet<Broker>(this._deadBrokers);
    }

    public SortedSet<Broker> brokenBrokers() {
        TreeSet<Broker> brokenBrokers = new TreeSet<Broker>(this._deadBrokers);
        brokenBrokers.addAll(this.brokersWithBadDisks());
        return Collections.unmodifiableSortedSet(brokenBrokers);
    }

    public Map<Integer, String> capacityEstimationInfoByBrokerId() {
        return Collections.unmodifiableMap(this._capacityEstimationInfoByBrokerId);
    }

    public SortedSet<Broker> demotedBrokers() {
        TreeSet<Broker> demotedBrokers = new TreeSet<Broker>();
        for (Rack rack : this._racksById.values()) {
            rack.brokers().forEach(b -> {
                if (b.isDemoted()) {
                    demotedBrokers.add((Broker)b);
                }
            });
        }
        return demotedBrokers;
    }

    public SortedSet<Broker> newBrokers() {
        return this._newBrokers;
    }

    public SortedSet<Broker> brokersWithBadDisks() {
        return Collections.unmodifiableSortedSet(this._brokersWithBadDisks);
    }

    public Set<Broker> brokersHavingOfflineReplicasOnBadDisks() {
        HashSet<Broker> brokersWithOfflineReplicasOnBadDisks = new HashSet<Broker>();
        for (Broker brokerWithBadDisks : this._brokersWithBadDisks) {
            if (brokerWithBadDisks.currentOfflineReplicas().isEmpty()) continue;
            brokersWithOfflineReplicasOnBadDisks.add(brokerWithBadDisks);
        }
        return brokersWithOfflineReplicasOnBadDisks;
    }

    public boolean isClusterAlive() {
        for (Rack rack : this._racksById.values()) {
            if (!rack.isRackAlive()) continue;
            return true;
        }
        return false;
    }

    public void clearLoad() {
        this._racksById.values().forEach(Rack::clearLoad);
        this._load.clearLoad();
    }

    public Replica removeReplica(int brokerId, TopicPartition tp) {
        for (Rack rack : this._racksById.values()) {
            Replica removedReplica = rack.removeReplica(brokerId, tp);
            if (removedReplica == null) continue;
            this._load.subtractLoad(removedReplica.load());
            this._potentialLeadershipLoadByBrokerId.get(brokerId).subtractLoad(this.partition(tp).leader().load());
            return removedReplica;
        }
        return null;
    }

    public SortedSet<Broker> brokers() {
        return new TreeSet<Broker>(this._brokers);
    }

    public Broker broker(int brokerId) {
        Rack rack = this._brokerIdToRack.get(brokerId);
        return rack == null ? null : rack.broker(brokerId);
    }

    void trackSortedReplicas(String sortName, Set<Function<Replica, Boolean>> selectionFuncs, List<Function<Replica, Integer>> priorityFuncs, Function<Replica, Double> scoreFunc) {
        this._brokers.forEach(b -> b.trackSortedReplicas(sortName, selectionFuncs, priorityFuncs, scoreFunc));
    }

    public void untrackSortedReplicas(String sortName) {
        this._brokers.forEach(b -> b.untrackSortedReplicas(sortName));
    }

    public void clearSortedReplicas() {
        this._brokers.forEach(Broker::clearSortedReplicas);
    }

    public void clear() {
        this._racksById.clear();
        this._partitionsByTopicPartition.clear();
        this._load.clearLoad();
        this._maxReplicationFactor = 1;
        this._replicationFactorByTopic.clear();
        this._capacityEstimationInfoByBrokerId.clear();
    }

    public int numAliveRacks() {
        int numAliveRacks = 0;
        for (Rack rack : this._racksById.values()) {
            if (!rack.isRackAlive()) continue;
            ++numAliveRacks;
        }
        return numAliveRacks;
    }

    public int numTopicReplicas(String topic) {
        int numTopicReplicas = 0;
        for (Rack rack : this._racksById.values()) {
            numTopicReplicas += rack.numTopicReplicas(topic);
        }
        return numTopicReplicas;
    }

    public int numLeaderReplicas() {
        return this._partitionsByTopicPartition.size();
    }

    public int numReplicas() {
        return this._partitionsByTopicPartition.values().stream().mapToInt(p -> p.replicas().size()).sum();
    }

    public Set<String> topics() {
        HashSet<String> topics = new HashSet<String>();
        for (Rack rack : this._racksById.values()) {
            topics.addAll(rack.topics());
        }
        return topics;
    }

    public double capacityFor(Resource resource) {
        return this._clusterCapacity[resource.id()];
    }

    public void setReplicaLoad(String rackId, int brokerId, TopicPartition tp, AggregatedMetricValues metricValues, List<Long> windows) {
        if (!this.broker(brokerId).replica(tp).load().isEmpty()) {
            throw new IllegalStateException(String.format("The load for %s on broker %d, rack %s already has metric values.", tp, brokerId, rackId));
        }
        Rack rack = this.rack(rackId);
        rack.setReplicaLoad(brokerId, tp, metricValues, windows);
        this._load.addMetricValues(metricValues, windows);
        Replica leader = this.partition(tp).leader();
        if (leader != null && leader.broker().id() == brokerId) {
            for (Replica replica : this.partition(tp).replicas()) {
                this._potentialLeadershipLoadByBrokerId.get(replica.broker().id()).addMetricValues(metricValues, windows);
            }
        }
    }

    public void handleDeadBroker(String rackId, int brokerId, BrokerCapacityInfo brokerCapacityInfo) {
        if (this.rack(rackId) == null) {
            this.createRack(rackId);
        }
        if (this.broker(brokerId) == null) {
            this.createBroker(rackId, String.format("UNKNOWN_HOST-%d", this._unknownHostId++), brokerId, brokerCapacityInfo, false);
        }
    }

    public Replica createReplica(String rackId, int brokerId, TopicPartition tp, int index, boolean isLeader) {
        return this.createReplica(rackId, brokerId, tp, index, isLeader, !this.broker(brokerId).isAlive(), null, false);
    }

    public Replica createReplica(String rackId, int brokerId, TopicPartition tp, int index, boolean isLeader, boolean isOffline, String logdir, boolean isFuture) {
        Replica replica;
        Broker broker = this.broker(brokerId);
        if (!isFuture) {
            Disk disk = null;
            if (logdir != null && (disk = broker.disk(logdir)) == null) {
                if (isOffline) {
                    disk = broker.addDeadDisk(logdir);
                } else {
                    throw new IllegalStateException("Missing disk information for disk " + logdir + " on broker " + this);
                }
            }
            replica = new Replica(tp, broker, isLeader, isOffline, disk);
        } else {
            replica = new Replica(tp, GENESIS_BROKER, false);
            replica.setBroker(broker);
        }
        this.rack(rackId).addReplica(replica);
        if (!this._partitionsByTopicPartition.containsKey(tp)) {
            this._partitionsByTopicPartition.put(tp, new Partition(tp));
            this._replicationFactorByTopic.putIfAbsent(tp.topic(), 1);
        }
        Partition partition = this._partitionsByTopicPartition.get(tp);
        if (replica.isLeader()) {
            partition.addLeader(replica, index);
            return replica;
        }
        partition.addFollower(replica, index);
        Replica leaderReplica = this.partition(tp).leader();
        if (leaderReplica != null) {
            this._potentialLeadershipLoadByBrokerId.get(brokerId).addLoad(leaderReplica.load());
        }
        int replicationFactor = Math.max(this._replicationFactorByTopic.get(tp.topic()), partition.followers().size() + 1);
        this._replicationFactorByTopic.put(tp.topic(), replicationFactor);
        this._maxReplicationFactor = Math.max(this._maxReplicationFactor, replicationFactor);
        return replica;
    }

    public void deleteReplica(TopicPartition topicPartition, int brokerId) {
        int currentReplicaCount = this._partitionsByTopicPartition.get(topicPartition).replicas().size();
        if (currentReplicaCount < 2) {
            throw new IllegalStateException(String.format("Unable to delete replica for topic partition %s since it only has %d replicas.", topicPartition, currentReplicaCount));
        }
        this.removeReplica(brokerId, topicPartition);
        Partition partition = this._partitionsByTopicPartition.get(topicPartition);
        partition.deleteReplica(brokerId);
        this._replicationFactorByTopic.put(topicPartition.topic(), partition.replicas().size());
    }

    public void refreshClusterMaxReplicationFactor() {
        this._maxReplicationFactor = this._replicationFactorByTopic.values().stream().max(Integer::compareTo).orElse(0);
    }

    public Broker createBroker(String rackId, String host, int brokerId, BrokerCapacityInfo brokerCapacityInfo, boolean populateReplicaPlacementInfo) {
        this._potentialLeadershipLoadByBrokerId.putIfAbsent(brokerId, new Load());
        Rack rack = this.rack(rackId);
        this._brokerIdToRack.put(brokerId, rack);
        if (brokerCapacityInfo.isEstimated()) {
            this._capacityEstimationInfoByBrokerId.put(brokerId, brokerCapacityInfo.estimationInfo());
        }
        Broker broker = rack.createBroker(brokerId, host, brokerCapacityInfo, populateReplicaPlacementInfo);
        this._aliveBrokers.add(broker);
        this._brokers.add(broker);
        this.refreshCapacity();
        return broker;
    }

    public Rack createRack(String rackId) {
        Rack rack = new Rack(rackId);
        return this._racksById.putIfAbsent(rackId, rack);
    }

    public void createOrDeleteReplicas(Map<Short, Set<String>> topicsByReplicationFactor, Map<String, List<Integer>> brokersByRack, Map<Integer, String> rackByBroker, Cluster cluster) {
        boolean needToRefreshClusterMaxReplicationFactor = false;
        for (Map.Entry<Short, Set<String>> entry : topicsByReplicationFactor.entrySet()) {
            short replicationFactor = entry.getKey();
            Set<String> topics = entry.getValue();
            for (String topic : topics) {
                ArrayList<String> racks = new ArrayList<String>(brokersByRack.keySet());
                int[] cursors = new int[racks.size()];
                int rackCursor = 0;
                for (PartitionInfo partitionInfo : cluster.partitionsForTopic(topic)) {
                    if (partitionInfo.replicas().length == replicationFactor) continue;
                    ArrayList<Integer> newAssignedReplica = new ArrayList<Integer>();
                    if (partitionInfo.replicas().length < replicationFactor) {
                        HashSet<String> currentOccupiedRack = new HashSet<String>();
                        Node[] nodeArray = partitionInfo.replicas();
                        int n = nodeArray.length;
                        for (int i = 0; i < n; ++i) {
                            Node node = nodeArray[i];
                            newAssignedReplica.add(node.id());
                            currentOccupiedRack.add(rackByBroker.get(node.id()));
                        }
                        while (newAssignedReplica.size() < replicationFactor) {
                            String rack = (String)racks.get(rackCursor);
                            if (!currentOccupiedRack.contains(rack) || currentOccupiedRack.size() == racks.size()) {
                                int cursor = cursors[rackCursor];
                                Integer brokerId = brokersByRack.get(rack).get(cursor);
                                if (!newAssignedReplica.contains(brokerId)) {
                                    newAssignedReplica.add(brokersByRack.get(rack).get(cursor));
                                    TopicPartition tp = new TopicPartition(topic, partitionInfo.partition());
                                    Load load = this.partition(tp).leader().getFollowerLoadFromLeader();
                                    this.createReplica(rack, brokerId, tp, partitionInfo.replicas().length, false, false, null, true);
                                    this.setReplicaLoad(rack, brokerId, tp, load.loadByWindows(), load.windows());
                                    currentOccupiedRack.add(rack);
                                }
                                cursors[rackCursor] = (cursor + 1) % brokersByRack.get(rack).size();
                            }
                            rackCursor = (rackCursor + 1) % racks.size();
                        }
                        continue;
                    }
                    newAssignedReplica.add(partitionInfo.leader().id());
                    for (Node node : partitionInfo.replicas()) {
                        if (node.id() == ((Integer)newAssignedReplica.get(0)).intValue()) continue;
                        if (newAssignedReplica.size() < replicationFactor) {
                            newAssignedReplica.add(node.id());
                            continue;
                        }
                        this.deleteReplica(new TopicPartition(topic, partitionInfo.partition()), node.id());
                        needToRefreshClusterMaxReplicationFactor = true;
                    }
                }
            }
        }
        if (needToRefreshClusterMaxReplicationFactor) {
            this.refreshClusterMaxReplicationFactor();
        }
    }

    public List<Broker> sortedAliveBrokersUnderThreshold(Resource resource, double utilizationThreshold) {
        List<Broker> sortedTargetBrokersUnderCapacityLimit = this.aliveBrokersUnderThreshold(resource, utilizationThreshold);
        sortedTargetBrokersUnderCapacityLimit.sort((o1, o2) -> {
            double expectedBrokerLoad1 = o1.load().expectedUtilizationFor(resource);
            double expectedBrokerLoad2 = o2.load().expectedUtilizationFor(resource);
            int hostComparison = 0;
            if (resource.isHostResource()) {
                double expectedHostLoad1 = o1.host().load().expectedUtilizationFor(resource);
                double expectedHostLoad2 = o2.host().load().expectedUtilizationFor(resource);
                hostComparison = Double.compare(expectedHostLoad1, expectedHostLoad2);
            }
            return hostComparison == 0 ? Double.compare(expectedBrokerLoad1, expectedBrokerLoad2) : hostComparison;
        });
        return sortedTargetBrokersUnderCapacityLimit;
    }

    public List<Broker> aliveBrokersUnderThreshold(Resource resource, double utilizationThreshold) {
        ArrayList<Broker> aliveBrokersUnderThreshold = new ArrayList<Broker>();
        for (Broker aliveBroker : this.aliveBrokers()) {
            if (resource.isBrokerResource()) {
                double brokerCapacityLimit = aliveBroker.capacityFor(resource) * utilizationThreshold;
                double brokerUtilization = aliveBroker.load().expectedUtilizationFor(resource);
                if (brokerUtilization >= brokerCapacityLimit) continue;
            }
            if (resource.isHostResource()) {
                double hostCapacityLimit = aliveBroker.host().capacityFor(resource) * utilizationThreshold;
                double hostUtilization = aliveBroker.host().load().expectedUtilizationFor(resource);
                if (hostUtilization >= hostCapacityLimit) continue;
            }
            aliveBrokersUnderThreshold.add(aliveBroker);
        }
        return aliveBrokersUnderThreshold;
    }

    public List<Broker> aliveBrokersOverThreshold(Resource resource, double utilizationThreshold) {
        ArrayList<Broker> aliveBrokersOverThreshold = new ArrayList<Broker>();
        for (Broker aliveBroker : this.aliveBrokers()) {
            if (resource.isBrokerResource()) {
                double brokerCapacityLimit = aliveBroker.capacityFor(resource) * utilizationThreshold;
                double brokerUtilization = aliveBroker.load().expectedUtilizationFor(resource);
                if (brokerUtilization <= brokerCapacityLimit) continue;
            }
            if (resource.isHostResource()) {
                double hostCapacityLimit = aliveBroker.host().capacityFor(resource) * utilizationThreshold;
                double hostUtilization = aliveBroker.host().load().expectedUtilizationFor(resource);
                if (hostUtilization <= hostCapacityLimit) continue;
            }
            aliveBrokersOverThreshold.add(aliveBroker);
        }
        return aliveBrokersOverThreshold;
    }

    public List<Partition> replicasSortedByUtilization(Resource resource, boolean wantMaxLoad, boolean wantAvgLoad) {
        ArrayList<Partition> partitionList = new ArrayList<Partition>(this._partitionsByTopicPartition.values());
        partitionList.sort((o1, o2) -> Double.compare(o2.leader().load().expectedUtilizationFor(resource, wantMaxLoad, wantAvgLoad), o1.leader().load().expectedUtilizationFor(resource, wantMaxLoad, wantAvgLoad)));
        return partitionList;
    }

    public void sanityCheck() {
        HashMap<String, Integer> errorMsgAndNumWindows = new HashMap<String, Integer>();
        int expectedNumWindows = this._load.numWindows();
        for (Map.Entry<Integer, Load> entry : this._potentialLeadershipLoadByBrokerId.entrySet()) {
            int n = entry.getKey();
            Load load = entry.getValue();
            if (load.numWindows() == expectedNumWindows || this.broker(n).replicas().size() == 0) continue;
            errorMsgAndNumWindows.put(String.format("Leadership(%d)", n), load.numWindows());
        }
        for (Rack rack : this._racksById.values()) {
            if (rack.load().numWindows() != expectedNumWindows && rack.replicas().size() != 0) {
                errorMsgAndNumWindows.put(String.format("Rack(%s)", rack.id()), rack.load().numWindows());
            }
            for (Host host : rack.hosts()) {
                if (host.load().numWindows() != expectedNumWindows && host.replicas().size() != 0) {
                    errorMsgAndNumWindows.put(String.format("Host(%s)", host.name()), host.load().numWindows());
                }
                for (Broker broker : rack.brokers()) {
                    if (broker.load().numWindows() != expectedNumWindows && broker.replicas().size() != 0) {
                        errorMsgAndNumWindows.put(String.format("Broker(%d)", broker.id()), broker.load().numWindows());
                    }
                    for (Replica replica : broker.replicas()) {
                        if (replica.load().numWindows() == expectedNumWindows) continue;
                        errorMsgAndNumWindows.put(String.format("Replica(%s-%d)", replica.topicPartition(), broker.id()), replica.load().numWindows());
                    }
                }
            }
        }
        StringBuilder exceptionMsg = new StringBuilder();
        for (Map.Entry entry : errorMsgAndNumWindows.entrySet()) {
            exceptionMsg.append(String.format("[%s: %d]%n", entry.getKey(), entry.getValue()));
        }
        if (exceptionMsg.length() > 0) {
            throw new IllegalArgumentException(String.format("Loads must have all have %d windows. Following loads violate this constraint with specified number of windows: %s", expectedNumWindows, exceptionMsg));
        }
        String string = "Inconsistent load distribution.";
        for (Broker broker : this.brokers()) {
            for (Resource resource : Resource.cachedValues()) {
                double sumOfReplicaUtilization = 0.0;
                for (Replica replica : broker.replicas()) {
                    sumOfReplicaUtilization += replica.load().expectedUtilizationFor(resource);
                }
                double brokerUtilization = broker.load().expectedUtilizationFor(resource);
                if (AnalyzerUtils.compare(sumOfReplicaUtilization, brokerUtilization, resource) == 0) continue;
                throw new IllegalArgumentException(String.format("%s Broker utilization for %s is different from the total replica utilization in the broker with id: %d. Sum of the replica utilization: %f, broker utilization: %f", new Object[]{string, resource, broker.id(), sumOfReplicaUtilization, brokerUtilization}));
            }
        }
        HashMap<Resource, Double> hashMap = new HashMap<Resource, Double>(Resource.cachedValues().size());
        for (Rack rack : this._racksById.values()) {
            HashMap<Resource, Double> sumOfHostUtilizationByResource = new HashMap<Resource, Double>(Resource.cachedValues().size());
            for (Host host : rack.hosts()) {
                for (Resource resource : Resource.cachedValues()) {
                    double sumOfBrokerUtilization = 0.0;
                    for (Broker broker : host.brokers()) {
                        sumOfBrokerUtilization += broker.load().expectedUtilizationFor(resource);
                    }
                    double hostUtilization = host.load().expectedUtilizationFor(resource);
                    if (AnalyzerUtils.compare(sumOfBrokerUtilization, hostUtilization, resource) != 0) {
                        throw new IllegalArgumentException(String.format("%s Host utilization for %s is different from the total broker utilization in the host : %s. Sum of the broker utilization: %f, host utilization: %f", new Object[]{string, resource, host.name(), sumOfBrokerUtilization, hostUtilization}));
                    }
                    sumOfHostUtilizationByResource.compute(resource, (k, v) -> (v == null ? 0.0 : v) + hostUtilization);
                }
            }
            for (Map.Entry entry : sumOfHostUtilizationByResource.entrySet()) {
                Resource resource = (Resource)((Object)entry.getKey());
                double sumOfHostsUtil = (Double)entry.getValue();
                double rackUtilization = rack.load().expectedUtilizationFor(resource);
                if (AnalyzerUtils.compare(rackUtilization, sumOfHostsUtil, resource) != 0) {
                    throw new IllegalArgumentException(String.format("%s Rack utilization for %s is different from the total host utilization in rack : %s. Sum of the host utilization: %f, rack utilization: %f", new Object[]{string, resource, rack.id(), sumOfHostsUtil, rackUtilization}));
                }
                hashMap.compute(resource, (k, v) -> (v == null ? 0.0 : v) + sumOfHostsUtil);
            }
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            Resource resource;
            resource = (Resource)((Object)entry.getKey());
            double sumOfRackUtil = (Double)entry.getValue();
            double clusterUtilization = this._load.expectedUtilizationFor(resource);
            if (AnalyzerUtils.compare(this._load.expectedUtilizationFor(resource), sumOfRackUtil, resource) == 0) continue;
            throw new IllegalArgumentException(String.format("%s Cluster utilization for %s is different from the total rack utilization in the cluster. Sum of the rack utilization: %f, cluster utilization: %f", new Object[]{string, resource, sumOfRackUtil, clusterUtilization}));
        }
        for (Broker broker : this.brokers()) {
            double sumOfLeaderOfReplicaUtilization = 0.0;
            for (Replica replica : broker.replicas()) {
                sumOfLeaderOfReplicaUtilization += this.partition(replica.topicPartition()).leader().load().expectedUtilizationFor(Resource.NW_OUT);
            }
            double d = this._potentialLeadershipLoadByBrokerId.get(broker.id()).expectedUtilizationFor(Resource.NW_OUT);
            if (AnalyzerUtils.compare(sumOfLeaderOfReplicaUtilization, d, Resource.NW_OUT) != 0) {
                throw new IllegalArgumentException(String.format("%s Leadership utilization for %s is different from the total utilization leader of replicas in the broker with id: %d. Expected: %f Received: %f", new Object[]{string, Resource.NW_OUT, broker.id(), sumOfLeaderOfReplicaUtilization, d}));
            }
            for (Resource resource : Resource.cachedValues()) {
                double cachedLoad;
                double leaderSum;
                if (resource == Resource.CPU || AnalyzerUtils.compare(leaderSum = broker.leaderReplicas().stream().mapToDouble(r -> r.load().expectedUtilizationFor(resource)).sum(), cachedLoad = broker.leadershipLoadForNwResources().expectedUtilizationFor(resource), resource) == 0) continue;
                throw new IllegalArgumentException(String.format("%s Leadership load for resource %s is %f but recomputed sum is %f", new Object[]{string, resource, cachedLoad, leaderSum}));
            }
        }
    }

    public BrokerStats brokerStats(KafkaCruiseControlConfig config) {
        BrokerStats brokerStats = new BrokerStats(config);
        this.brokers().forEach(broker -> brokerStats.addSingleBrokerStats((Broker)broker, this.potentialLeadershipLoadFor(broker.id()).expectedUtilizationFor(Resource.NW_OUT), this._capacityEstimationInfoByBrokerId.get(broker.id()) != null));
        return brokerStats;
    }

    public double[] variance() {
        RawAndDerivedResource[] resources = RawAndDerivedResource.values();
        double[][] utilization = this.utilizationMatrix();
        double[] variance = new double[resources.length];
        for (int resourceIndex = 0; resourceIndex < resources.length; ++resourceIndex) {
            Variance varianceCalculator = new Variance();
            variance[resourceIndex] = varianceCalculator.evaluate(utilization[resourceIndex]);
        }
        return variance;
    }

    public double[][] utilizationMatrix() {
        RawAndDerivedResource[] resources = RawAndDerivedResource.values();
        double[][] utilization = new double[resources.length][this.brokers().size()];
        int brokerIndex = 0;
        for (Broker broker : this.brokers()) {
            double leaderBytesInRate = broker.leadershipLoadForNwResources().expectedUtilizationFor(Resource.NW_IN);
            block8: for (RawAndDerivedResource derivedResource : resources) {
                switch (derivedResource) {
                    case DISK: 
                    case NW_OUT: 
                    case CPU: {
                        utilization[derivedResource.ordinal()][brokerIndex] = broker.load().expectedUtilizationFor(derivedResource.derivedFrom());
                        continue block8;
                    }
                    case FOLLOWER_NW_IN: {
                        utilization[derivedResource.ordinal()][brokerIndex] = broker.load().expectedUtilizationFor(derivedResource.derivedFrom()) - leaderBytesInRate;
                        continue block8;
                    }
                    case LEADER_NW_IN: {
                        utilization[derivedResource.ordinal()][brokerIndex] = leaderBytesInRate;
                        continue block8;
                    }
                    case PWN_NW_OUT: {
                        utilization[derivedResource.ordinal()][brokerIndex] = this.potentialLeadershipLoadFor(broker.id()).expectedUtilizationFor(Resource.NW_OUT);
                        continue block8;
                    }
                    case REPLICAS: {
                        utilization[derivedResource.ordinal()][brokerIndex] = broker.replicas().size();
                        continue block8;
                    }
                    default: {
                        throw new IllegalStateException("Unhandled case " + derivedResource + ".");
                    }
                }
            }
            ++brokerIndex;
        }
        return utilization;
    }

    public void writeTo(OutputStream out) throws IOException {
        String cluster = String.format("<Cluster maxPartitionReplicationFactor=\"%d\">%n", this._maxReplicationFactor);
        out.write(cluster.getBytes(StandardCharsets.UTF_8));
        for (Rack rack : this._racksById.values()) {
            rack.writeTo(out);
        }
        out.write("</Cluster>".getBytes(StandardCharsets.UTF_8));
    }

    public String toString() {
        return String.format("ClusterModel[brokerCount=%d,partitionCount=%d,aliveBrokerCount=%d]", this._brokers.size(), this._partitionsByTopicPartition.size(), this._aliveBrokers.size());
    }

    private void refreshCapacity() {
        for (Resource r : Resource.cachedValues()) {
            double capacity = 0.0;
            for (Rack rack : this._racksById.values()) {
                capacity += rack.capacityFor(r);
            }
            this._clusterCapacity[r.id()] = capacity;
        }
    }
}

