/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.analyzer.goals;

import com.linkedin.kafka.cruisecontrol.analyzer.BalancingConstraint;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractRackAwareGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalUtils;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.Replica;
import com.linkedin.kafka.cruisecontrol.model.ReplicaSortFunctionFactory;
import com.linkedin.kafka.cruisecontrol.model.SortedReplicasHelper;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;

public class RackAwareDistributionGoal
extends AbstractRackAwareGoal {
    private BalanceLimit _balanceLimit;

    public RackAwareDistributionGoal() {
    }

    RackAwareDistributionGoal(BalancingConstraint constraint) {
        this._balancingConstraint = constraint;
    }

    @Override
    protected boolean doesReplicaMoveViolateActionAcceptance(ClusterModel clusterModel, Replica sourceReplica, Broker destinationBroker) {
        String destinationRackId = destinationBroker.rack().id();
        String sourceRackId = sourceReplica.broker().rack().id();
        if (sourceRackId.equals(destinationRackId)) {
            return false;
        }
        Set<Broker> partitionBrokers = clusterModel.partition(sourceReplica.topicPartition()).partitionBrokers();
        Map<String, Integer> numReplicasByRack = RackAwareDistributionGoal.numPartitionReplicasByRackId(partitionBrokers);
        return numReplicasByRack.getOrDefault(destinationRackId, 0) >= numReplicasByRack.getOrDefault(sourceRackId, 0);
    }

    private static Map<String, Integer> numPartitionReplicasByRackId(Set<Broker> partitionBrokers) {
        HashMap<String, Integer> numPartitionReplicasByRackId = new HashMap<String, Integer>(partitionBrokers.size());
        for (Broker broker : partitionBrokers) {
            numPartitionReplicasByRackId.merge(broker.rack().id(), 1, Integer::sum);
        }
        return numPartitionReplicasByRackId;
    }

    @Override
    public String name() {
        return RackAwareDistributionGoal.class.getSimpleName();
    }

    @Override
    protected void initGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions) {
        this._balanceLimit = new BalanceLimit(clusterModel);
        Set<String> excludedTopics = optimizationOptions.excludedTopics();
        new SortedReplicasHelper().maybeAddSelectionFunc(ReplicaSortFunctionFactory.selectImmigrants(), optimizationOptions.onlyMoveImmigrantReplicas()).addSelectionFunc(ReplicaSortFunctionFactory.selectReplicasBasedOnExcludedTopics(excludedTopics)).trackSortedReplicasFor(GoalUtils.replicaSortName(this, false, false), clusterModel);
    }

    @Override
    protected void updateGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions) throws OptimizationFailureException {
        GoalUtils.ensureNoOfflineReplicas(clusterModel, this.name());
        this.ensureRackAwareDistribution(clusterModel, optimizationOptions);
        GoalUtils.ensureReplicasMoveOffBrokersWithBadDisks(clusterModel, this.name());
        this.finish();
    }

    @Override
    public void finish() {
        super.finish();
        this._balanceLimit.clear();
    }

    @Override
    protected void rebalanceForBroker(Broker broker, ClusterModel clusterModel, Set<Goal> optimizedGoals, OptimizationOptions optimizationOptions) throws OptimizationFailureException {
        this.rebalanceForBroker(broker, clusterModel, optimizedGoals, optimizationOptions, false);
    }

    @Override
    protected SortedSet<Broker> rackAwareEligibleBrokers(Replica replica, ClusterModel clusterModel) {
        Set<Broker> partitionBrokers = clusterModel.partition(replica.topicPartition()).partitionBrokers();
        Map<String, Integer> numReplicasByRack = RackAwareDistributionGoal.numPartitionReplicasByRackId(partitionBrokers);
        numReplicasByRack.merge(replica.broker().rack().id(), -1, Integer::sum);
        int baseLimit = this._balanceLimit.baseLimitByRF(partitionBrokers.size());
        boolean canMoveToRacksAtBaseLimit = false;
        int numRacksWithOneMoreReplicaLimit = this._balanceLimit.numRacksWithOneMoreReplicaByRF(partitionBrokers.size());
        int numRacksWithAtLeastOneMoreReplica = (int)numReplicasByRack.values().stream().filter(r -> r > baseLimit).count();
        if (numRacksWithAtLeastOneMoreReplica < numRacksWithOneMoreReplicaLimit) {
            canMoveToRacksAtBaseLimit = true;
        }
        TreeSet<Broker> rackAwareDistributionEligibleBrokers = new TreeSet<Broker>(Comparator.comparingInt(b -> numReplicasByRack.getOrDefault(b.rack().id(), 0)).thenComparingInt(Broker::id));
        for (Broker destinationBroker : clusterModel.aliveBrokers()) {
            int numReplicasInThisRack = numReplicasByRack.getOrDefault(destinationBroker.rack().id(), 0);
            if (numReplicasInThisRack >= baseLimit && (!canMoveToRacksAtBaseLimit || numReplicasInThisRack != baseLimit) || partitionBrokers.contains(destinationBroker)) continue;
            rackAwareDistributionEligibleBrokers.add(destinationBroker);
        }
        return rackAwareDistributionEligibleBrokers;
    }

    @Override
    protected boolean shouldKeepInTheCurrentRack(Replica replica, ClusterModel clusterModel) {
        Set<Broker> partitionBrokers = clusterModel.partition(replica.topicPartition()).partitionBrokers();
        int replicationFactor = partitionBrokers.size();
        Map<String, Integer> numReplicasByRack = RackAwareDistributionGoal.numPartitionReplicasByRackId(partitionBrokers);
        int baseLimit = this._balanceLimit.baseLimitByRF(replicationFactor);
        int numRacksWithOneMoreReplicaLimit = this._balanceLimit.numRacksWithOneMoreReplicaByRF(replicationFactor);
        int upperLimit = baseLimit + (numRacksWithOneMoreReplicaLimit == 0 ? 0 : 1);
        int numReplicasInThisRack = numReplicasByRack.get(replica.broker().rack().id());
        if (numReplicasInThisRack <= baseLimit) {
            return true;
        }
        if (numReplicasInThisRack > upperLimit) {
            return false;
        }
        int numRacksWithOneMoreReplica = (int)numReplicasByRack.values().stream().filter(r -> r > baseLimit).count();
        return numRacksWithOneMoreReplica <= numRacksWithOneMoreReplicaLimit;
    }

    private void ensureRackAwareDistribution(ClusterModel clusterModel, OptimizationOptions optimizationOptions) throws OptimizationFailureException {
        Set<String> excludedTopics = optimizationOptions.excludedTopics();
        for (Replica leader : clusterModel.leaderReplicas()) {
            boolean someAliveRacksHaveNoReplicas;
            Set<Broker> partitionBrokers;
            Map<String, Integer> numReplicasByRack;
            int maxNumReplicasInARack;
            if (excludedTopics.contains(leader.topicPartition().topic()) || (maxNumReplicasInARack = Collections.max((numReplicasByRack = RackAwareDistributionGoal.numPartitionReplicasByRackId(partitionBrokers = clusterModel.partition(leader.topicPartition()).partitionBrokers())).values()).intValue()) <= 1) continue;
            boolean bl = someAliveRacksHaveNoReplicas = numReplicasByRack.size() < this._balanceLimit.numAliveRacks();
            if (!someAliveRacksHaveNoReplicas && maxNumReplicasInARack - Collections.min(numReplicasByRack.values()) <= 1) continue;
            String mitigation = GoalUtils.mitigationForOptimizationFailures(optimizationOptions);
            throw new OptimizationFailureException(String.format("Optimization for goal %s failed for rack-aware distribution of partition %s. Brokers (%s) and replicas per rack (%s). %s", this.name(), leader.topicPartition(), partitionBrokers, numReplicasByRack, mitigation));
        }
    }

    private static class BalanceLimit {
        private final int _numAliveRacks;
        private final Map<Integer, Integer> _baseLimitByRF;
        private final Map<Integer, Integer> _numRacksWithOneMoreReplicaByRF;

        BalanceLimit(ClusterModel clusterModel) {
            this._numAliveRacks = clusterModel.numAliveRacks();
            int maxReplicationFactor = clusterModel.maxReplicationFactor();
            this._baseLimitByRF = new HashMap<Integer, Integer>(maxReplicationFactor);
            this._numRacksWithOneMoreReplicaByRF = new HashMap<Integer, Integer>(maxReplicationFactor);
            for (int replicationFactor = 1; replicationFactor <= maxReplicationFactor; ++replicationFactor) {
                int baseLimit = replicationFactor / this._numAliveRacks;
                this._baseLimitByRF.put(replicationFactor, baseLimit);
                this._numRacksWithOneMoreReplicaByRF.put(replicationFactor, replicationFactor % this._numAliveRacks);
            }
        }

        public int numAliveRacks() {
            return this._numAliveRacks;
        }

        public Integer baseLimitByRF(int replicationFactor) {
            return this._baseLimitByRF.get(replicationFactor);
        }

        public Integer numRacksWithOneMoreReplicaByRF(int replicationFactor) {
            return this._numRacksWithOneMoreReplicaByRF.get(replicationFactor);
        }

        public void clear() {
            this._baseLimitByRF.clear();
            this._numRacksWithOneMoreReplicaByRF.clear();
        }
    }
}

