/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.analyzer.goals;

import com.linkedin.kafka.cruisecontrol.analyzer.BalancingConstraint;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractRackAwareGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalUtils;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.Replica;
import com.linkedin.kafka.cruisecontrol.model.ReplicaSortFunctionFactory;
import com.linkedin.kafka.cruisecontrol.model.SortedReplicasHelper;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;

public class RackAwareGoal
extends AbstractRackAwareGoal {
    public RackAwareGoal() {
    }

    RackAwareGoal(BalancingConstraint constraint) {
        this._balancingConstraint = constraint;
    }

    @Override
    protected boolean doesReplicaMoveViolateActionAcceptance(ClusterModel clusterModel, Replica sourceReplica, Broker destinationBroker) {
        Set<Broker> partitionBrokers = clusterModel.partition(sourceReplica.topicPartition()).partitionBrokers();
        partitionBrokers.remove(sourceReplica.broker());
        for (Broker broker : partitionBrokers) {
            if (!broker.rack().brokers().contains(destinationBroker)) continue;
            return true;
        }
        return false;
    }

    @Override
    public String name() {
        return RackAwareGoal.class.getSimpleName();
    }

    @Override
    protected void initGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions) throws OptimizationFailureException {
        int numAliveRacks = clusterModel.numAliveRacks();
        Set<String> excludedTopics = optimizationOptions.excludedTopics();
        if (!excludedTopics.isEmpty()) {
            int maxReplicationFactorOfIncludedTopics = 1;
            Map<String, Integer> replicationFactorByTopic = clusterModel.replicationFactorByTopic();
            for (Map.Entry<String, Integer> replicationFactorByTopicEntry : replicationFactorByTopic.entrySet()) {
                if (excludedTopics.contains(replicationFactorByTopicEntry.getKey()) || (maxReplicationFactorOfIncludedTopics = Math.max(maxReplicationFactorOfIncludedTopics, replicationFactorByTopicEntry.getValue())) <= numAliveRacks) continue;
                throw new OptimizationFailureException(String.format("[%s] Insufficient number of racks to distribute included replicas (Current: %d, Needed: %d).", this.name(), numAliveRacks, maxReplicationFactorOfIncludedTopics));
            }
        } else if (clusterModel.maxReplicationFactor() > numAliveRacks) {
            throw new OptimizationFailureException(String.format("[%s] Insufficient number of racks to distribute each replica (Current: %d, Needed: %d).", this.name(), numAliveRacks, clusterModel.maxReplicationFactor()));
        }
        new SortedReplicasHelper().maybeAddSelectionFunc(ReplicaSortFunctionFactory.selectImmigrants(), optimizationOptions.onlyMoveImmigrantReplicas()).addSelectionFunc(ReplicaSortFunctionFactory.selectReplicasBasedOnExcludedTopics(excludedTopics)).trackSortedReplicasFor(GoalUtils.replicaSortName(this, false, false), clusterModel);
    }

    @Override
    protected void updateGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions) throws OptimizationFailureException {
        this.ensureRackAware(clusterModel, optimizationOptions);
        GoalUtils.ensureNoOfflineReplicas(clusterModel, this.name());
        GoalUtils.ensureReplicasMoveOffBrokersWithBadDisks(clusterModel, this.name());
        this.finish();
    }

    @Override
    protected void rebalanceForBroker(Broker broker, ClusterModel clusterModel, Set<Goal> optimizedGoals, OptimizationOptions optimizationOptions) throws OptimizationFailureException {
        this.rebalanceForBroker(broker, clusterModel, optimizedGoals, optimizationOptions, true);
    }

    private void ensureRackAware(ClusterModel clusterModel, OptimizationOptions optimizationOptions) throws OptimizationFailureException {
        Set<String> excludedTopics = optimizationOptions.excludedTopics();
        for (Replica leader : clusterModel.leaderReplicas()) {
            if (excludedTopics.contains(leader.topicPartition().topic())) continue;
            HashSet<String> replicaBrokersRackIds = new HashSet<String>();
            HashSet<Broker> followerBrokers = new HashSet<Broker>(clusterModel.partition(leader.topicPartition()).followerBrokers());
            for (Broker followerBroker : followerBrokers) {
                String followerRackId = followerBroker.rack().id();
                replicaBrokersRackIds.add(followerRackId);
            }
            replicaBrokersRackIds.add(leader.broker().rack().id());
            if (replicaBrokersRackIds.size() == followerBrokers.size() + 1) continue;
            String mitigation = GoalUtils.mitigationForOptimizationFailures(optimizationOptions);
            throw new OptimizationFailureException(String.format("Optimization for goal %s failed for rack-awareness of partition %s. Leader (%s) and follower brokers (%s). %s", this.name(), leader.topicPartition(), leader.broker(), followerBrokers, mitigation));
        }
    }

    @Override
    protected SortedSet<Broker> rackAwareEligibleBrokers(Replica replica, ClusterModel clusterModel) {
        List partitionRackIds = clusterModel.partition(replica.topicPartition()).partitionBrokers().stream().map(partitionBroker -> partitionBroker.rack().id()).collect(Collectors.toList());
        partitionRackIds.remove(replica.broker().rack().id());
        TreeSet<Broker> rackAwareEligibleBrokers = new TreeSet<Broker>((o1, o2) -> Integer.compare(o1.id(), o2.id()));
        for (Broker broker : clusterModel.aliveBrokers()) {
            if (partitionRackIds.contains(broker.rack().id())) continue;
            rackAwareEligibleBrokers.add(broker);
        }
        return rackAwareEligibleBrokers;
    }

    @Override
    protected boolean shouldKeepInTheCurrentRack(Replica replica, ClusterModel clusterModel) {
        String myRackId = replica.broker().rack().id();
        int myBrokerId = replica.broker().id();
        for (Broker partitionBroker : clusterModel.partition(replica.topicPartition()).partitionBrokers()) {
            if (!myRackId.equals(partitionBroker.rack().id()) || myBrokerId == partitionBroker.id()) continue;
            return false;
        }
        return true;
    }
}

