/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner;

import com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance;
import com.linkedin.kafka.cruisecontrol.analyzer.ActionType;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerUtils;
import com.linkedin.kafka.cruisecontrol.exception.KafkaCruiseControlException;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.Partition;
import com.linkedin.kafka.cruisecontrol.model.Replica;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaAssignerEvenRackAwareGoal
implements Goal {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaAssignerEvenRackAwareGoal.class);
    private final Map<Integer, SortedSet<BrokerReplicaCount>> _aliveBrokerReplicaCountByPosition = new HashMap<Integer, SortedSet<BrokerReplicaCount>>();
    private Map<String, List<Partition>> _partitionsByTopic = null;

    private void initGoalState(ClusterModel clusterModel, Set<String> excludedTopics) throws OptimizationFailureException {
        this.ensureRackAwareSatisfiable(clusterModel, excludedTopics);
        this._partitionsByTopic = clusterModel.getPartitionsByTopic();
        HashMap numExcludedReplicasByPositionInBroker = new HashMap();
        clusterModel.brokers().forEach(broker -> numExcludedReplicasByPositionInBroker.put(broker.id(), new HashMap()));
        for (String excludedTopic : excludedTopics) {
            for (Partition partition : this._partitionsByTopic.get(excludedTopic)) {
                int position = 0;
                ((Map)numExcludedReplicasByPositionInBroker.get(partition.leader().broker().id())).merge(position, 1, Integer::sum);
                for (Broker followerBroker : partition.followerBrokers()) {
                    ((Map)numExcludedReplicasByPositionInBroker.get(followerBroker.id())).merge(++position, 1, Integer::sum);
                }
            }
        }
        int maxReplicationFactor = clusterModel.maxReplicationFactor();
        for (int i = 0; i < maxReplicationFactor; ++i) {
            TreeSet<BrokerReplicaCount> aliveBrokersByReplicaCount = new TreeSet<BrokerReplicaCount>();
            for (Broker broker2 : clusterModel.aliveBrokers()) {
                int numExcludedReplicasInPosition = ((Map)numExcludedReplicasByPositionInBroker.get(broker2.id())).getOrDefault(i, 0);
                BrokerReplicaCount brokerReplicaCount = new BrokerReplicaCount(broker2, numExcludedReplicasInPosition);
                aliveBrokersByReplicaCount.add(brokerReplicaCount);
            }
            this._aliveBrokerReplicaCountByPosition.put(i, aliveBrokersByReplicaCount);
        }
    }

    @Override
    public boolean optimize(ClusterModel clusterModel, Set<Goal> optimizedGoals, OptimizationOptions optimizationOptions) throws KafkaCruiseControlException {
        KafkaAssignerUtils.sanityCheckOptimizationOptions(optimizationOptions);
        Set<String> excludedTopics = optimizationOptions.excludedTopics();
        LOG.debug("Starting {} with excluded topics = {}", (Object)this.name(), excludedTopics);
        if (!optimizedGoals.isEmpty()) {
            throw new IllegalArgumentException(String.format("Goals %s cannot be optimized before %s.", optimizedGoals, this.name()));
        }
        this.initGoalState(clusterModel, excludedTopics);
        for (Map.Entry<String, List<Partition>> entry : this._partitionsByTopic.entrySet()) {
            for (Partition partition : entry.getValue()) {
                if (partition.replicas().get(0) == partition.leader()) continue;
                partition.swapReplicaPositions(0, partition.replicas().indexOf(partition.leader()));
            }
        }
        int maxReplicationFactor = clusterModel.maxReplicationFactor();
        for (int position = 0; position < maxReplicationFactor; ++position) {
            for (Map.Entry entry : this._partitionsByTopic.entrySet()) {
                for (Partition partition : (List)entry.getValue()) {
                    if (partition.replicas().size() <= position || this.shouldExclude(partition, position, excludedTopics) || this.maybeApplyMove(clusterModel, partition, position)) continue;
                    throw new OptimizationFailureException(String.format("[%s] Unable to apply move for replica %s.", this.name(), this.replicaAtPosition(partition, position)));
                }
            }
        }
        GoalUtils.ensureNoOfflineReplicas(clusterModel, this.name());
        this.ensureRackAware(clusterModel, excludedTopics);
        return true;
    }

    private boolean maybeApplyMove(ClusterModel clusterModel, Partition partition, int replicaPosition) {
        HashSet<String> ineligibleRackIds = new HashSet<String>();
        for (int pos = 0; pos < replicaPosition; ++pos) {
            Replica replica = this.replicaAtPosition(partition, pos);
            ineligibleRackIds.add(replica.broker().rack().id());
        }
        BrokerReplicaCount eligibleBrokerReplicaCount = null;
        Iterator it = this._aliveBrokerReplicaCountByPosition.get(replicaPosition).iterator();
        while (it.hasNext()) {
            BrokerReplicaCount destinationBrokerReplicaCount = (BrokerReplicaCount)it.next();
            if (ineligibleRackIds.contains(destinationBrokerReplicaCount.broker().rack().id())) continue;
            Broker destinationBroker = destinationBrokerReplicaCount.broker();
            Replica destinationReplica = destinationBroker.replica(partition.topicPartition());
            Replica replicaAtPosition = partition.replicas().get(replicaPosition);
            if (destinationReplica == null) {
                LOG.trace("Destination broker {} has no other replica from the same partition, move the replica {} to there.", (Object)destinationBroker, (Object)replicaAtPosition);
                this.applyBalancingAction(clusterModel, replicaAtPosition, destinationBroker, ActionType.INTER_BROKER_REPLICA_MOVEMENT);
            } else if (destinationBroker.id() != replicaAtPosition.broker().id() && replicaAtPosition.broker().isAlive()) {
                LOG.trace("Destination broker has a replica {} with a larger position than source replica {}, swap positions.", (Object)destinationReplica, (Object)replicaAtPosition);
                if (replicaPosition == 0) {
                    this.applyBalancingAction(clusterModel, replicaAtPosition, destinationBroker, ActionType.LEADERSHIP_MOVEMENT);
                } else {
                    int destinationPos = this.followerPosition(partition, destinationBroker.id());
                    partition.swapFollowerPositions(replicaPosition, destinationPos);
                }
            } else if (!replicaAtPosition.broker().isAlive()) {
                LOG.trace("Source broker {} is dead and either the destination broker {} is the same as the source, or has a replica from the same partition.", (Object)replicaAtPosition.broker(), (Object)destinationBroker);
                continue;
            }
            eligibleBrokerReplicaCount = destinationBrokerReplicaCount;
            it.remove();
            break;
        }
        if (eligibleBrokerReplicaCount != null) {
            eligibleBrokerReplicaCount.incReplicaCount();
            this._aliveBrokerReplicaCountByPosition.get(replicaPosition).add(eligibleBrokerReplicaCount);
            return true;
        }
        return false;
    }

    private int followerPosition(Partition partition, int brokerId) {
        int followerPos = 0;
        for (Replica replica : partition.replicas()) {
            if (replica.broker().id() == brokerId) {
                return followerPos;
            }
            ++followerPos;
        }
        throw new IllegalArgumentException(String.format("Partition %s has no follower on %d.", partition, brokerId));
    }

    private void applyBalancingAction(ClusterModel clusterModel, Replica sourceReplica, Broker destinationBroker, ActionType action) {
        if (action == ActionType.LEADERSHIP_MOVEMENT) {
            clusterModel.relocateLeadership(sourceReplica.topicPartition(), sourceReplica.broker().id(), destinationBroker.id());
        } else if (action == ActionType.INTER_BROKER_REPLICA_MOVEMENT) {
            clusterModel.relocateReplica(sourceReplica.topicPartition(), sourceReplica.broker().id(), destinationBroker.id());
        }
    }

    private Replica replicaAtPosition(Partition sourcePartition, int position) {
        return sourcePartition.replicas().get(position);
    }

    private boolean shouldExclude(Partition partition, int position, Set<String> excludedTopics) {
        Replica replica = this.replicaAtPosition(partition, position);
        return excludedTopics.contains(replica.topicPartition().topic()) && !replica.isOriginalOffline();
    }

    private void ensureRackAwareSatisfiable(ClusterModel clusterModel, Set<String> excludedTopics) throws OptimizationFailureException {
        int numAliveRacks = clusterModel.numAliveRacks();
        if (!excludedTopics.isEmpty()) {
            int maxReplicationFactorOfIncludedTopics = 1;
            Map<String, Integer> replicationFactorByTopic = clusterModel.replicationFactorByTopic();
            for (Map.Entry<String, Integer> replicationFactorByTopicEntry : replicationFactorByTopic.entrySet()) {
                if (excludedTopics.contains(replicationFactorByTopicEntry.getKey()) || (maxReplicationFactorOfIncludedTopics = Math.max(maxReplicationFactorOfIncludedTopics, replicationFactorByTopicEntry.getValue())) <= numAliveRacks) continue;
                throw new OptimizationFailureException(String.format("[%s] Insufficient number of racks to distribute included replicas (Current: %d, Needed: %d).", this.name(), numAliveRacks, maxReplicationFactorOfIncludedTopics));
            }
        } else if (clusterModel.maxReplicationFactor() > numAliveRacks) {
            throw new OptimizationFailureException(String.format("[%s] Insufficient number of racks to distribute each replica (Current: %d, Needed: %d).", this.name(), numAliveRacks, clusterModel.maxReplicationFactor()));
        }
    }

    private void ensureRackAware(ClusterModel clusterModel, Set<String> excludedTopics) throws OptimizationFailureException {
        for (Replica leader : clusterModel.leaderReplicas()) {
            if (excludedTopics.contains(leader.topicPartition().topic())) continue;
            HashSet<String> replicaBrokersRackIds = new HashSet<String>();
            HashSet<Broker> followerBrokers = new HashSet<Broker>(clusterModel.partition(leader.topicPartition()).followerBrokers());
            for (Broker followerBroker : followerBrokers) {
                String followerRackId = followerBroker.rack().id();
                replicaBrokersRackIds.add(followerRackId);
            }
            replicaBrokersRackIds.add(leader.broker().rack().id());
            if (replicaBrokersRackIds.size() == followerBrokers.size() + 1) continue;
            throw new OptimizationFailureException("Optimization for goal " + this.name() + " failed for rack-awareness of partition " + leader.topicPartition());
        }
    }

    @Override
    public ActionAcceptance actionAcceptance(BalancingAction action, ClusterModel clusterModel) {
        switch (action.balancingAction()) {
            case LEADERSHIP_MOVEMENT: {
                return ActionAcceptance.ACCEPT;
            }
            case INTER_BROKER_REPLICA_MOVEMENT: 
            case INTER_BROKER_REPLICA_SWAP: {
                if (this.isReplicaMoveViolateRackAwareness(clusterModel, c -> c.broker(action.sourceBrokerId()).replica(action.topicPartition()), c -> c.broker(action.destinationBrokerId()))) {
                    return ActionAcceptance.BROKER_REJECT;
                }
                if (action.balancingAction() == ActionType.INTER_BROKER_REPLICA_SWAP && this.isReplicaMoveViolateRackAwareness(clusterModel, c -> c.broker(action.destinationBrokerId()).replica(action.destinationTopicPartition()), c -> c.broker(action.sourceBrokerId()))) {
                    return ActionAcceptance.REPLICA_REJECT;
                }
                return ActionAcceptance.ACCEPT;
            }
        }
        throw new IllegalArgumentException("Unsupported balancing action " + action.balancingAction() + " is provided.");
    }

    private boolean isReplicaMoveViolateRackAwareness(ClusterModel clusterModel, Function<ClusterModel, Replica> sourceReplicaFunction, Function<ClusterModel, Broker> destinationBrokerFunction) {
        Replica sourceReplica = sourceReplicaFunction.apply(clusterModel);
        Broker destinationBroker = destinationBrokerFunction.apply(clusterModel);
        Set<Broker> partitionBrokers = clusterModel.partition(sourceReplica.topicPartition()).partitionBrokers();
        partitionBrokers.remove(sourceReplica.broker());
        for (Broker broker : partitionBrokers) {
            if (!broker.rack().brokers().contains(destinationBroker)) continue;
            return true;
        }
        return false;
    }

    @Override
    public ModelCompletenessRequirements clusterModelCompletenessRequirements() {
        return new ModelCompletenessRequirements(1, 0.0, true);
    }

    @Override
    public String name() {
        return KafkaAssignerEvenRackAwareGoal.class.getSimpleName();
    }

    @Override
    public void finish() {
    }

    @Override
    public boolean isHardGoal() {
        return true;
    }

    @Override
    public Goal.ClusterModelStatsComparator clusterModelStatsComparator() {
        return new GoalUtils.HardGoalStatsComparator();
    }

    public void configure(Map<String, ?> configs) {
    }

    private static class BrokerReplicaCount
    implements Comparable<BrokerReplicaCount> {
        private final Broker _broker;
        private int _replicaCount;

        BrokerReplicaCount(Broker broker, int replicaCount) {
            this._broker = broker;
            this._replicaCount = replicaCount;
        }

        public Broker broker() {
            return this._broker;
        }

        int replicaCount() {
            return this._replicaCount;
        }

        void incReplicaCount() {
            ++this._replicaCount;
        }

        @Override
        public int compareTo(BrokerReplicaCount o) {
            if (this._replicaCount > o.replicaCount()) {
                return 1;
            }
            if (this._replicaCount < o.replicaCount()) {
                return -1;
            }
            return Integer.compare(this._broker.id(), o.broker().id());
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            BrokerReplicaCount that = (BrokerReplicaCount)o;
            return this._replicaCount == that._replicaCount && this._broker.id() == that._broker.id();
        }

        public int hashCode() {
            return Objects.hash(this._broker.id(), this._replicaCount);
        }
    }
}

