package com.linkedin.kafka.cruisecontrol.analyzer.goals;

import com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance;
import com.linkedin.kafka.cruisecontrol.analyzer.ActionType;
import com.linkedin.kafka.cruisecontrol.analyzer.AnalyzerUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingConstraint;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionAbstractGoal;
import com.linkedin.kafka.cruisecontrol.common.Statistic;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.ClusterModelStats;
import com.linkedin.kafka.cruisecontrol.model.Replica;
import com.linkedin.kafka.cruisecontrol.model.ReplicaSortFunctionFactory;
import com.linkedin.kafka.cruisecontrol.model.SortedReplicasHelper;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/analyzer/goals/TopicReplicaDistributionGoal.class */
public class TopicReplicaDistributionGoal extends AbstractGoal {
    private static final Logger LOG = LoggerFactory.getLogger(TopicReplicaDistributionGoal.class);
    private static final double BALANCE_MARGIN = 0.9d;
    private boolean _fixOfflineReplicasOnly;
    private final Map<String, Set<Integer>> _brokerIdsAboveBalanceUpperLimitByTopic;
    private final Map<String, Set<Integer>> _brokerIdsUnderBalanceLowerLimitByTopic;
    private final Map<String, Double> _avgTopicReplicasOnAliveBroker;
    private final Map<String, Integer> _balanceUpperLimitByTopic;
    private final Map<String, Integer> _balanceLowerLimitByTopic;

    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/analyzer/goals/TopicReplicaDistributionGoal$TopicReplicaDistrGoalStatsComparator.class */
    private class TopicReplicaDistrGoalStatsComparator implements Goal.ClusterModelStatsComparator {
        private String _reasonForLastNegativeResult;

        private TopicReplicaDistrGoalStatsComparator() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal.ClusterModelStatsComparator, java.util.Comparator
        public int compare(ClusterModelStats clusterModelStats, ClusterModelStats clusterModelStats2) {
            double doubleValue = clusterModelStats.topicReplicaStats().get(Statistic.ST_DEV).doubleValue();
            double doubleValue2 = clusterModelStats2.topicReplicaStats().get(Statistic.ST_DEV).doubleValue();
            int compare = AnalyzerUtils.compare(doubleValue2, doubleValue, 1.0E-5d);
            if (compare < 0) {
                this._reasonForLastNegativeResult = String.format("Violated %s. [Std Deviation of Topic Replica Distribution] post-optimization:%.3f pre-optimization:%.3f", TopicReplicaDistributionGoal.this.name(), Double.valueOf(doubleValue), Double.valueOf(doubleValue2));
            }
            return compare;
        }

        @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal.ClusterModelStatsComparator
        public String explainLastComparison() {
            return this._reasonForLastNegativeResult;
        }
    }

    public TopicReplicaDistributionGoal() {
        this._brokerIdsAboveBalanceUpperLimitByTopic = new HashMap();
        this._brokerIdsUnderBalanceLowerLimitByTopic = new HashMap();
        this._avgTopicReplicasOnAliveBroker = new HashMap();
        this._balanceUpperLimitByTopic = new HashMap();
        this._balanceLowerLimitByTopic = new HashMap();
    }

    public TopicReplicaDistributionGoal(BalancingConstraint balancingConstraint) {
        this();
        this._balancingConstraint = balancingConstraint;
    }

    private double balancePercentageWithMargin(OptimizationOptions optimizationOptions) {
        return ((optimizationOptions.isTriggeredByGoalViolation() ? this._balancingConstraint.topicReplicaBalancePercentage().doubleValue() * this._balancingConstraint.goalViolationDistributionThresholdMultiplier().doubleValue() : this._balancingConstraint.topicReplicaBalancePercentage().doubleValue()) - 1.0d) * BALANCE_MARGIN;
    }

    private int balanceUpperLimit(String str, OptimizationOptions optimizationOptions) {
        return (int) Math.ceil(this._avgTopicReplicasOnAliveBroker.get(str).doubleValue() * (1.0d + balancePercentageWithMargin(optimizationOptions)));
    }

    private int balanceLowerLimit(String str, OptimizationOptions optimizationOptions) {
        return (int) Math.floor(this._avgTopicReplicasOnAliveBroker.get(str).doubleValue() * Math.max(0.0d, 1.0d - balancePercentageWithMargin(optimizationOptions)));
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public ActionAcceptance actionAcceptance(BalancingAction balancingAction, ClusterModel clusterModel) {
        Broker broker = clusterModel.broker(balancingAction.sourceBrokerId().intValue());
        Broker broker2 = clusterModel.broker(balancingAction.destinationBrokerId().intValue());
        String str = balancingAction.topic();
        switch (balancingAction.balancingAction()) {
            case INTER_BROKER_REPLICA_SWAP:
                String destinationTopic = balancingAction.destinationTopic();
                if (str.equals(destinationTopic)) {
                    return ActionAcceptance.ACCEPT;
                }
                return ((isReplicaCountUnderBalanceUpperLimitAfterChange(str, broker2, ReplicaDistributionAbstractGoal.ChangeType.ADD) && isReplicaCountAboveBalanceLowerLimitAfterChange(str, broker, ReplicaDistributionAbstractGoal.ChangeType.REMOVE)) && isReplicaCountUnderBalanceUpperLimitAfterChange(destinationTopic, broker, ReplicaDistributionAbstractGoal.ChangeType.ADD) && isReplicaCountAboveBalanceLowerLimitAfterChange(destinationTopic, broker2, ReplicaDistributionAbstractGoal.ChangeType.REMOVE)) ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT;
            case LEADERSHIP_MOVEMENT:
                return ActionAcceptance.ACCEPT;
            case INTER_BROKER_REPLICA_MOVEMENT:
                return (isReplicaCountUnderBalanceUpperLimitAfterChange(str, broker2, ReplicaDistributionAbstractGoal.ChangeType.ADD) && isReplicaCountAboveBalanceLowerLimitAfterChange(str, broker, ReplicaDistributionAbstractGoal.ChangeType.REMOVE)) ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT;
            default:
                throw new IllegalArgumentException("Unsupported balancing action " + balancingAction.balancingAction() + " is provided.");
        }
    }

    private boolean isReplicaCountUnderBalanceUpperLimitAfterChange(String str, Broker broker, ReplicaDistributionAbstractGoal.ChangeType changeType) {
        int numReplicasOfTopicInBroker = broker.numReplicasOfTopicInBroker(str);
        int intValue = broker.isAlive() ? this._balanceUpperLimitByTopic.get(str).intValue() : 0;
        return changeType == ReplicaDistributionAbstractGoal.ChangeType.ADD ? numReplicasOfTopicInBroker + 1 <= intValue : numReplicasOfTopicInBroker - 1 <= intValue;
    }

    private boolean isReplicaCountAboveBalanceLowerLimitAfterChange(String str, Broker broker, ReplicaDistributionAbstractGoal.ChangeType changeType) {
        int numReplicasOfTopicInBroker = broker.numReplicasOfTopicInBroker(str);
        int intValue = broker.isAlive() ? this._balanceLowerLimitByTopic.get(str).intValue() : 0;
        return changeType == ReplicaDistributionAbstractGoal.ChangeType.ADD ? numReplicasOfTopicInBroker + 1 >= intValue : numReplicasOfTopicInBroker - 1 >= intValue;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public Goal.ClusterModelStatsComparator clusterModelStatsComparator() {
        return new TopicReplicaDistrGoalStatsComparator();
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public ModelCompletenessRequirements clusterModelCompletenessRequirements() {
        return new ModelCompletenessRequirements(1, 0.0d, true);
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public String name() {
        return TopicReplicaDistributionGoal.class.getSimpleName();
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public boolean isHardGoal() {
        return false;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected SortedSet<Broker> brokersToBalance(ClusterModel clusterModel) {
        return clusterModel.brokers();
    }

    private Set<String> topicsToRebalance(ClusterModel clusterModel, Set<String> set) {
        HashSet hashSet;
        if (clusterModel.selfHealingEligibleReplicas().isEmpty()) {
            hashSet = new HashSet(clusterModel.topics());
            hashSet.removeAll(set);
        } else {
            hashSet = new HashSet();
            Iterator<Replica> it = clusterModel.selfHealingEligibleReplicas().iterator();
            while (it.hasNext()) {
                hashSet.add(it.next().topicPartition().topic());
            }
        }
        if (hashSet.isEmpty()) {
            LOG.warn("All topics are excluded from {}.", name());
        }
        return hashSet;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected void initGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions) {
        Set<String> excludedTopics = optimizationOptions.excludedTopics();
        Set<String> set = topicsToRebalance(clusterModel, excludedTopics);
        for (String str : clusterModel.topics()) {
            this._avgTopicReplicasOnAliveBroker.put(str, Double.valueOf(clusterModel.numTopicReplicas(str) / clusterModel.aliveBrokers().size()));
            this._balanceUpperLimitByTopic.put(str, Integer.valueOf(balanceUpperLimit(str, optimizationOptions)));
            this._balanceLowerLimitByTopic.put(str, Integer.valueOf(balanceLowerLimit(str, optimizationOptions)));
            if (!set.contains(str)) {
                this._avgTopicReplicasOnAliveBroker.remove(str);
            }
        }
        for (Broker broker : clusterModel.brokers()) {
            new SortedReplicasHelper().maybeAddSelectionFunc(ReplicaSortFunctionFactory.selectImmigrants(), optimizationOptions.onlyMoveImmigrantReplicas()).maybeAddSelectionFunc(ReplicaSortFunctionFactory.selectImmigrantOrOfflineReplicas(), !clusterModel.selfHealingEligibleReplicas().isEmpty() && broker.isAlive()).addSelectionFunc(ReplicaSortFunctionFactory.selectReplicasBasedOnExcludedTopics(excludedTopics)).trackSortedReplicasFor(GoalUtils.replicaSortName(this, false, false), broker);
        }
        this._fixOfflineReplicasOnly = false;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected boolean selfSatisfied(ClusterModel clusterModel, BalancingAction balancingAction) {
        Broker broker = clusterModel.broker(balancingAction.sourceBrokerId().intValue());
        if (this._fixOfflineReplicasOnly && broker.replica(balancingAction.topicPartition()).isCurrentOffline()) {
            return balancingAction.balancingAction() == ActionType.INTER_BROKER_REPLICA_MOVEMENT;
        }
        Broker broker2 = clusterModel.broker(balancingAction.destinationBrokerId().intValue());
        String str = balancingAction.topic();
        return isReplicaCountUnderBalanceUpperLimitAfterChange(str, broker2, ReplicaDistributionAbstractGoal.ChangeType.ADD) && isReplicaCountAboveBalanceLowerLimitAfterChange(str, broker, ReplicaDistributionAbstractGoal.ChangeType.REMOVE);
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected void updateGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions) throws OptimizationFailureException {
        if (!this._brokerIdsAboveBalanceUpperLimitByTopic.isEmpty()) {
            this._brokerIdsAboveBalanceUpperLimitByTopic.clear();
            this._succeeded = false;
        }
        if (!this._brokerIdsUnderBalanceLowerLimitByTopic.isEmpty()) {
            this._brokerIdsUnderBalanceLowerLimitByTopic.clear();
            this._succeeded = false;
        }
        try {
            GoalUtils.ensureNoOfflineReplicas(clusterModel, name());
            GoalUtils.ensureReplicasMoveOffBrokersWithBadDisks(clusterModel, name());
            finish();
        } catch (OptimizationFailureException e) {
            if (this._fixOfflineReplicasOnly) {
                throw e;
            }
            this._fixOfflineReplicasOnly = true;
            LOG.info("Ignoring topic replica balance limit to move replicas from dead brokers/disks.");
        }
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public void finish() {
        this._finished = true;
    }

    private static boolean skipBrokerRebalance(Broker broker, ClusterModel clusterModel, Collection<Replica> collection, boolean z, boolean z2, boolean z3, boolean z4) {
        boolean anyMatch = collection.stream().anyMatch(replica -> {
            return broker.immigrantReplicas().contains(replica);
        });
        if (broker.isAlive() && !z2 && !z) {
            LOG.trace("Skip rebalance: Broker {} is already within the limit for replicas {}.", broker, collection);
            return true;
        }
        if (!clusterModel.newBrokers().isEmpty() && !broker.isNew() && !z) {
            LOG.trace("Skip rebalance: Cluster has new brokers and this broker {} is not new, but does not require less load for replicas {}. Hence, it does not have any offline replicas.", broker, collection);
            return true;
        }
        if (!clusterModel.selfHealingEligibleReplicas().isEmpty() && z && !z3 && !anyMatch) {
            LOG.trace("Skip rebalance: Cluster is in self-healing mode and the broker {} requires less load, but none of its current offline or immigrant replicas are from the topic being balanced {}.", broker, collection);
            return true;
        }
        if (!z4 || !z || anyMatch) {
            return false;
        }
        LOG.trace("Skip rebalance: Only immigrant replicas can be moved, but none of broker {}'s current immigrant replicas are from the topic being balanced {}.", broker, collection);
        return true;
    }

    private static Set<Replica> retainCurrentOfflineBrokerReplicas(Broker broker, Collection<Replica> collection) {
        HashSet hashSet = new HashSet(collection);
        hashSet.retainAll(broker.currentOfflineReplicas());
        return hashSet;
    }

    private boolean isTopicExcludedFromRebalance(String str) {
        return this._avgTopicReplicasOnAliveBroker.get(str) == null;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected void rebalanceForBroker(Broker broker, ClusterModel clusterModel, Set<Goal> set, OptimizationOptions optimizationOptions) {
        LOG.debug("Rebalancing broker {} [limits] lower: {} upper: {}.", new Object[]{Integer.valueOf(broker.id()), this._balanceLowerLimitByTopic, this._balanceUpperLimitByTopic});
        for (String str : broker.topics()) {
            if (!isTopicExcludedFromRebalance(str)) {
                Collection<Replica> replicasOfTopicInBroker = broker.replicasOfTopicInBroker(str);
                int size = replicasOfTopicInBroker.size();
                int size2 = retainCurrentOfflineBrokerReplicas(broker, replicasOfTopicInBroker).size();
                boolean z = size2 > 0 || size > this._balanceUpperLimitByTopic.get(str).intValue();
                boolean z2 = broker.isAlive() && size - size2 < this._balanceLowerLimitByTopic.get(str).intValue();
                if (!skipBrokerRebalance(broker, clusterModel, replicasOfTopicInBroker, z, z2, size2 > 0, optimizationOptions.onlyMoveImmigrantReplicas())) {
                    if (z && rebalanceByMovingReplicasOut(broker, str, clusterModel, set, optimizationOptions)) {
                        this._brokerIdsAboveBalanceUpperLimitByTopic.computeIfAbsent(str, str2 -> {
                            return new HashSet();
                        }).add(Integer.valueOf(broker.id()));
                        LOG.debug("Failed to sufficiently decrease replicas of topic {} in broker {} with replica movements. Replicas: {}.", new Object[]{str, Integer.valueOf(broker.id()), Integer.valueOf(broker.numReplicasOfTopicInBroker(str))});
                    }
                    if (z2 && rebalanceByMovingReplicasIn(broker, str, clusterModel, set, optimizationOptions)) {
                        this._brokerIdsUnderBalanceLowerLimitByTopic.computeIfAbsent(str, str3 -> {
                            return new HashSet();
                        }).add(Integer.valueOf(broker.id()));
                        LOG.debug("Failed to sufficiently increase replicas of topic {} in broker {} with replica movements. Replicas: {}.", new Object[]{str, Integer.valueOf(broker.id()), Integer.valueOf(broker.numReplicasOfTopicInBroker(str))});
                    }
                    if (!this._brokerIdsAboveBalanceUpperLimitByTopic.getOrDefault(str, Collections.emptySet()).contains(Integer.valueOf(broker.id())) && !this._brokerIdsUnderBalanceLowerLimitByTopic.getOrDefault(str, Collections.emptySet()).contains(Integer.valueOf(broker.id()))) {
                        LOG.debug("Successfully balanced replicas of topic {} in broker {} by moving replicas. Replicas: {}", new Object[]{str, Integer.valueOf(broker.id()), Integer.valueOf(broker.numReplicasOfTopicInBroker(str))});
                    }
                }
            }
        }
    }

    private SortedSet<Replica> replicasToMoveOut(Broker broker, String str) {
        TreeSet treeSet = new TreeSet(broker.replicaComparator());
        treeSet.addAll(broker.replicasOfTopicInBroker(str));
        treeSet.retainAll(broker.trackedSortedReplicas(GoalUtils.replicaSortName(this, false, false)).sortedReplicas(false));
        return treeSet;
    }

    private boolean rebalanceByMovingReplicasOut(Broker broker, String str, ClusterModel clusterModel, Set<Goal> set, OptimizationOptions optimizationOptions) {
        TreeSet treeSet = new TreeSet(Comparator.comparingInt(broker2 -> {
            return broker2.numReplicasOfTopicInBroker(str);
        }).thenComparingInt((v0) -> {
            return v0.id();
        }));
        treeSet.addAll(this._fixOfflineReplicasOnly ? clusterModel.aliveBrokers() : (Collection) clusterModel.aliveBrokers().stream().filter(broker3 -> {
            return broker3.numReplicasOfTopicInBroker(str) < this._balanceUpperLimitByTopic.get(str).intValue();
        }).collect(Collectors.toSet()));
        Collection<Replica> replicasOfTopicInBroker = broker.replicasOfTopicInBroker(str);
        int size = replicasOfTopicInBroker.size();
        int size2 = retainCurrentOfflineBrokerReplicas(broker, replicasOfTopicInBroker).size();
        boolean z = false;
        for (Replica replica : replicasToMoveOut(broker, str)) {
            if (z && !replica.isCurrentOffline() && size <= this._balanceUpperLimitByTopic.get(str).intValue()) {
                return false;
            }
            boolean isCurrentOffline = replica.isCurrentOffline();
            Broker maybeApplyBalancingAction = maybeApplyBalancingAction(clusterModel, replica, treeSet, ActionType.INTER_BROKER_REPLICA_MOVEMENT, set, optimizationOptions);
            if (maybeApplyBalancingAction != null) {
                if (isCurrentOffline) {
                    size2--;
                }
                size--;
                if (size <= (size2 == 0 ? this._balanceUpperLimitByTopic.get(str).intValue() : 0)) {
                    return false;
                }
                treeSet.remove(maybeApplyBalancingAction);
                if (maybeApplyBalancingAction.numReplicasOfTopicInBroker(str) < this._balanceUpperLimitByTopic.get(str).intValue() || this._fixOfflineReplicasOnly) {
                    treeSet.add(maybeApplyBalancingAction);
                }
            } else if (isCurrentOffline) {
                z = true;
            }
        }
        return !broker.replicasOfTopicInBroker(str).isEmpty();
    }

    private boolean rebalanceByMovingReplicasIn(Broker broker, String str, ClusterModel clusterModel, Set<Goal> set, OptimizationOptions optimizationOptions) {
        PriorityQueue priorityQueue = new PriorityQueue((broker2, broker3) -> {
            Collection<Replica> replicasOfTopicInBroker = broker3.replicasOfTopicInBroker(str);
            int size = replicasOfTopicInBroker.size();
            int size2 = retainCurrentOfflineBrokerReplicas(broker3, replicasOfTopicInBroker).size();
            Collection<Replica> replicasOfTopicInBroker2 = broker2.replicasOfTopicInBroker(str);
            int size3 = replicasOfTopicInBroker2.size();
            int compare = Integer.compare(size2, retainCurrentOfflineBrokerReplicas(broker3, replicasOfTopicInBroker2).size());
            if (compare != 0) {
                return compare;
            }
            int compare2 = Integer.compare(size, size3);
            return compare2 == 0 ? Integer.compare(broker2.id(), broker3.id()) : compare2;
        });
        if (this._fixOfflineReplicasOnly) {
            Stream filter = clusterModel.brokers().stream().filter(broker4 -> {
                return broker4.id() != broker.id();
            });
            priorityQueue.getClass();
            filter.forEach((v1) -> {
                r1.add(v1);
            });
        } else {
            for (Broker broker5 : clusterModel.brokers()) {
                if (broker5.numReplicasOfTopicInBroker(str) > this._balanceLowerLimitByTopic.get(str).intValue() || !broker5.currentOfflineReplicas().isEmpty()) {
                    priorityQueue.add(broker5);
                }
            }
        }
        int size = broker.replicasOfTopicInBroker(str).size();
        Set singleton = Collections.singleton(broker);
        while (!priorityQueue.isEmpty()) {
            Broker broker6 = (Broker) priorityQueue.poll();
            SortedSet<Replica> replicasToMoveOut = replicasToMoveOut(broker6, str);
            int size2 = retainCurrentOfflineBrokerReplicas(broker6, replicasToMoveOut).size();
            Iterator<Replica> it = replicasToMoveOut.iterator();
            while (true) {
                if (it.hasNext()) {
                    Replica next = it.next();
                    boolean isCurrentOffline = next.isCurrentOffline();
                    if (maybeApplyBalancingAction(clusterModel, next, singleton, ActionType.INTER_BROKER_REPLICA_MOVEMENT, set, optimizationOptions) != null) {
                        if (isCurrentOffline) {
                            size2--;
                        }
                        size++;
                        if (size >= this._balanceLowerLimitByTopic.get(str).intValue()) {
                            return false;
                        }
                        if (!priorityQueue.isEmpty() && size2 == 0 && broker6.numReplicasOfTopicInBroker(str) < ((Broker) priorityQueue.peek()).numReplicasOfTopicInBroker(str)) {
                            priorityQueue.add(broker6);
                            break;
                        }
                    }
                }
            }
        }
        return true;
    }
}
