/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.analyzer.goals;

import com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance;
import com.linkedin.kafka.cruisecontrol.analyzer.ActionType;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingConstraint;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalUtils;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.Replica;
import com.linkedin.kafka.cruisecontrol.model.ReplicaSortFunctionFactory;
import com.linkedin.kafka.cruisecontrol.model.SortedReplicasHelper;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReplicaCapacityGoal
extends AbstractGoal {
    private static final Logger LOG = LoggerFactory.getLogger(ReplicaCapacityGoal.class);
    private boolean _isSelfHealingMode;

    public ReplicaCapacityGoal() {
        this._isSelfHealingMode = false;
    }

    ReplicaCapacityGoal(BalancingConstraint constraint) {
        this._balancingConstraint = constraint;
        this._isSelfHealingMode = false;
    }

    @Override
    public ActionAcceptance actionAcceptance(BalancingAction action, ClusterModel clusterModel) {
        switch (action.balancingAction()) {
            case INTER_BROKER_REPLICA_MOVEMENT: 
            case REPLICA_ADDITION: {
                Broker destinationBroker = clusterModel.broker(action.destinationBrokerId());
                return (long)destinationBroker.replicas().size() < this._balancingConstraint.maxReplicasPerBroker() ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT;
            }
            case INTER_BROKER_REPLICA_SWAP: 
            case LEADERSHIP_MOVEMENT: 
            case REPLICA_DELETION: {
                return ActionAcceptance.ACCEPT;
            }
        }
        throw new IllegalArgumentException("Unsupported balancing action " + action.balancingAction() + " is provided.");
    }

    @Override
    public Goal.ClusterModelStatsComparator clusterModelStatsComparator() {
        return new GoalUtils.HardGoalStatsComparator();
    }

    @Override
    public ModelCompletenessRequirements clusterModelCompletenessRequirements() {
        return new ModelCompletenessRequirements(1, 0.0, true);
    }

    @Override
    public String name() {
        return ReplicaCapacityGoal.class.getSimpleName();
    }

    @Override
    public boolean isHardGoal() {
        return true;
    }

    @Override
    protected SortedSet<Broker> brokersToBalance(ClusterModel clusterModel) {
        return clusterModel.brokers();
    }

    @Override
    protected void initGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions) throws OptimizationFailureException {
        ArrayList<String> topicsToRebalance = new ArrayList<String>(clusterModel.topics());
        Set<String> excludedTopics = optimizationOptions.excludedTopics();
        topicsToRebalance.removeAll(excludedTopics);
        if (topicsToRebalance.isEmpty()) {
            LOG.warn("All topics are excluded from {}.", (Object)this.name());
        }
        int totalReplicasInCluster = 0;
        for (Broker broker : this.brokersToBalance(clusterModel)) {
            totalReplicasInCluster += broker.replicas().size();
            if (!broker.isAlive()) {
                this._isSelfHealingMode = true;
                continue;
            }
            HashSet<Replica> excludedReplicasInBroker = new HashSet<Replica>();
            for (String topic : excludedTopics) {
                excludedReplicasInBroker.addAll(broker.replicasOfTopicInBroker(topic));
            }
            if (broker.state() == Broker.State.BAD_DISKS) {
                this._isSelfHealingMode = true;
                excludedReplicasInBroker.removeAll(broker.currentOfflineReplicas());
            }
            if ((long)excludedReplicasInBroker.size() <= this._balancingConstraint.maxReplicasPerBroker()) continue;
            String mitigation = GoalUtils.mitigationForOptimizationFailures(optimizationOptions);
            throw new OptimizationFailureException(String.format("[%s] Replicas of excluded topics in broker: %d exceeds the maximum allowed number of replicas per broker: %d. %s", this.name(), excludedReplicasInBroker.size(), this._balancingConstraint.maxReplicasPerBroker(), mitigation));
        }
        long maxReplicasInCluster = this._balancingConstraint.maxReplicasPerBroker() * (long)clusterModel.aliveBrokers().size();
        if ((long)totalReplicasInCluster > maxReplicasInCluster) {
            throw new OptimizationFailureException(String.format("[%s] Total replicas in cluster: %d exceeds the maximum allowed replicas in cluster: %d (Alive brokers: %d, Allowed number of replicas per broker: %d).", this.name(), totalReplicasInCluster, maxReplicasInCluster, clusterModel.aliveBrokers().size(), this._balancingConstraint.maxReplicasPerBroker()));
        }
        new SortedReplicasHelper().maybeAddSelectionFunc(ReplicaSortFunctionFactory.selectImmigrants(), optimizationOptions.onlyMoveImmigrantReplicas()).addSelectionFunc(ReplicaSortFunctionFactory.selectReplicasBasedOnExcludedTopics(excludedTopics)).trackSortedReplicasFor(GoalUtils.replicaSortName(this, false, false), clusterModel);
    }

    @Override
    protected boolean selfSatisfied(ClusterModel clusterModel, BalancingAction action) {
        Broker destinationBroker = clusterModel.broker(action.destinationBrokerId());
        return (long)destinationBroker.replicas().size() < this._balancingConstraint.maxReplicasPerBroker();
    }

    @Override
    protected void updateGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions) throws OptimizationFailureException {
        GoalUtils.ensureNoOfflineReplicas(clusterModel, this.name());
        GoalUtils.ensureReplicasMoveOffBrokersWithBadDisks(clusterModel, this.name());
        if (!this._isSelfHealingMode) {
            this.ensureReplicaCapacitySatisfied(clusterModel, optimizationOptions);
            this.finish();
        } else {
            this._isSelfHealingMode = false;
        }
    }

    @Override
    public void finish() {
        this._finished = true;
    }

    private void ensureReplicaCapacitySatisfied(ClusterModel clusterModel, OptimizationOptions optimizationOptions) throws OptimizationFailureException {
        for (Broker broker : this.brokersToBalance(clusterModel)) {
            int numBrokerReplicas = broker.replicas().size();
            if ((long)numBrokerReplicas <= this._balancingConstraint.maxReplicasPerBroker()) continue;
            String mitigation = GoalUtils.mitigationForOptimizationFailures(optimizationOptions);
            throw new OptimizationFailureException(String.format("[%s] Replicas in broker %d exceeds the maximum allowed number of replicas per broker: %d. %s", this.name(), numBrokerReplicas, this._balancingConstraint.maxReplicasPerBroker(), mitigation));
        }
    }

    @Override
    protected void rebalanceForBroker(Broker broker, ClusterModel clusterModel, Set<Goal> optimizedGoals, OptimizationOptions optimizationOptions) throws OptimizationFailureException {
        LOG.debug("balancing broker {}, optimized goals = {}", (Object)broker, optimizedGoals);
        for (Replica replica : broker.trackedSortedReplicas(GoalUtils.replicaSortName(this, false, false)).sortedReplicas(true)) {
            boolean isReplicaOffline = replica.isCurrentOffline();
            if ((long)broker.replicas().size() <= this._balancingConstraint.maxReplicasPerBroker() && !isReplicaOffline) break;
            List<Broker> eligibleBrokers = this.eligibleBrokers(replica, clusterModel).stream().map(BrokerReplicaCount::broker).collect(Collectors.toList());
            Broker b = this.maybeApplyBalancingAction(clusterModel, replica, eligibleBrokers, ActionType.INTER_BROKER_REPLICA_MOVEMENT, optimizedGoals, optimizationOptions);
            if (b != null) continue;
            if (!broker.isAlive()) {
                throw new OptimizationFailureException(String.format("[%s] Failed to move dead broker replica %s of partition %s to a broker in %s. Per broker limit: %d for brokers: %s", this.name(), replica, clusterModel.partition(replica.topicPartition()), eligibleBrokers, this._balancingConstraint.maxReplicasPerBroker(), clusterModel.brokers()));
            }
            if (isReplicaOffline) {
                throw new OptimizationFailureException(String.format("[%s] Failed to move offline replica %s of partition %s to a broker in %s. Per broker limit: %d for brokers: %s", this.name(), replica, clusterModel.partition(replica.topicPartition()), eligibleBrokers, this._balancingConstraint.maxReplicasPerBroker(), clusterModel.brokers()));
            }
            LOG.debug("Failed to move replica {} to any broker in {}.", (Object)replica, eligibleBrokers);
        }
    }

    private SortedSet<BrokerReplicaCount> eligibleBrokers(Replica replica, ClusterModel clusterModel) {
        TreeSet<BrokerReplicaCount> eligibleBrokers = new TreeSet<BrokerReplicaCount>();
        int sourceBrokerId = replica.broker().id();
        for (Broker broker : clusterModel.aliveBrokers()) {
            if (!this._isSelfHealingMode && (long)broker.replicas().size() >= this._balancingConstraint.maxReplicasPerBroker() || broker.id() == sourceBrokerId) continue;
            eligibleBrokers.add(new BrokerReplicaCount(broker));
        }
        return eligibleBrokers;
    }

    private static class BrokerReplicaCount
    implements Comparable<BrokerReplicaCount> {
        private final Broker _broker;
        private int _replicaCount;

        BrokerReplicaCount(Broker broker) {
            this._broker = broker;
            this._replicaCount = broker.replicas().size();
        }

        public Broker broker() {
            return this._broker;
        }

        int replicaCount() {
            return this._replicaCount;
        }

        @Override
        public int compareTo(BrokerReplicaCount o) {
            if (this._replicaCount > o.replicaCount()) {
                return 1;
            }
            if (this._replicaCount < o.replicaCount()) {
                return -1;
            }
            return Integer.compare(this._broker.id(), o.broker().id());
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            BrokerReplicaCount that = (BrokerReplicaCount)o;
            return this._replicaCount == that._replicaCount && this._broker.id() == that._broker.id();
        }

        public int hashCode() {
            return Objects.hash(this._broker.id(), this._replicaCount);
        }
    }
}

