/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.analyzer.goals;

import com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance;
import com.linkedin.kafka.cruisecontrol.analyzer.ActionType;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingConstraint;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalUtils;
import com.linkedin.kafka.cruisecontrol.common.Resource;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.Load;
import com.linkedin.kafka.cruisecontrol.model.Replica;
import com.linkedin.kafka.cruisecontrol.model.ReplicaSortFunctionFactory;
import com.linkedin.kafka.cruisecontrol.model.SortedReplicasHelper;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class CapacityGoal
extends AbstractGoal {
    private static final Logger LOG = LoggerFactory.getLogger(CapacityGoal.class);

    public CapacityGoal() {
    }

    @Override
    public boolean isHardGoal() {
        return true;
    }

    CapacityGoal(BalancingConstraint constraint) {
        this._balancingConstraint = constraint;
    }

    protected abstract Resource resource();

    @Override
    public ActionAcceptance actionAcceptance(BalancingAction action, ClusterModel clusterModel) {
        Replica sourceReplica = clusterModel.broker(action.sourceBrokerId()).replica(action.topicPartition());
        Broker destinationBroker = clusterModel.broker(action.destinationBrokerId());
        switch (action.balancingAction()) {
            case INTER_BROKER_REPLICA_SWAP: {
                Replica destinationReplica = destinationBroker.replica(action.destinationTopicPartition());
                return this.isSwapAcceptableForCapacity(sourceReplica, destinationReplica) ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT;
            }
            case INTER_BROKER_REPLICA_MOVEMENT: 
            case LEADERSHIP_MOVEMENT: {
                return this.isMovementAcceptableForCapacity(sourceReplica, destinationBroker) ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT;
            }
        }
        throw new IllegalArgumentException("Unsupported balancing action " + action.balancingAction() + " is provided.");
    }

    @Override
    public Goal.ClusterModelStatsComparator clusterModelStatsComparator() {
        return new GoalUtils.HardGoalStatsComparator();
    }

    @Override
    public ModelCompletenessRequirements clusterModelCompletenessRequirements() {
        return new ModelCompletenessRequirements(1, this._minMonitoredPartitionPercentage, true);
    }

    @Override
    public abstract String name();

    @Override
    protected boolean selfSatisfied(ClusterModel clusterModel, BalancingAction action) {
        Replica sourceReplica = clusterModel.broker(action.sourceBrokerId()).replica(action.topicPartition());
        Broker destinationBroker = clusterModel.broker(action.destinationBrokerId());
        return this.isMovementAcceptableForCapacity(sourceReplica, destinationBroker);
    }

    @Override
    protected SortedSet<Broker> brokersToBalance(ClusterModel clusterModel) {
        return clusterModel.brokers();
    }

    @Override
    protected void initGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions) throws OptimizationFailureException {
        Load recentClusterLoad = clusterModel.load();
        double existingUtilization = recentClusterLoad.expectedUtilizationFor(this.resource());
        double allowedCapacity = clusterModel.capacityFor(this.resource()) * this._balancingConstraint.capacityThreshold(this.resource());
        if (allowedCapacity < existingUtilization) {
            throw new OptimizationFailureException(String.format("[%s] Insufficient healthy cluster capacity for resource: %s existing cluster utilization %f allowed capacity %f (capacity threshold: %f).", new Object[]{this.name(), this.resource(), existingUtilization, allowedCapacity, this._balancingConstraint.capacityThreshold(this.resource())}));
        }
        Set<String> excludedTopics = optimizationOptions.excludedTopics();
        boolean onlyMoveImmigrantReplicas = optimizationOptions.onlyMoveImmigrantReplicas();
        new SortedReplicasHelper().maybeAddSelectionFunc(ReplicaSortFunctionFactory.selectImmigrants(), onlyMoveImmigrantReplicas).addSelectionFunc(ReplicaSortFunctionFactory.selectReplicasBasedOnExcludedTopics(excludedTopics)).addPriorityFunc(ReplicaSortFunctionFactory.prioritizeOfflineReplicas()).maybeAddPriorityFunc(ReplicaSortFunctionFactory.prioritizeImmigrants(), !onlyMoveImmigrantReplicas).setScoreFunc(ReplicaSortFunctionFactory.reverseSortByMetricGroupValue(this.resource().name())).trackSortedReplicasFor(GoalUtils.replicaSortName(this, true, false), clusterModel);
        new SortedReplicasHelper().addSelectionFunc(ReplicaSortFunctionFactory.selectLeaders()).maybeAddSelectionFunc(ReplicaSortFunctionFactory.selectImmigrants(), onlyMoveImmigrantReplicas).addSelectionFunc(ReplicaSortFunctionFactory.selectReplicasBasedOnExcludedTopics(excludedTopics)).maybeAddPriorityFunc(ReplicaSortFunctionFactory.prioritizeImmigrants(), !onlyMoveImmigrantReplicas).setScoreFunc(ReplicaSortFunctionFactory.reverseSortByMetricGroupValue(this.resource().name())).trackSortedReplicasFor(GoalUtils.replicaSortName(this, true, true), clusterModel);
    }

    @Override
    protected void updateGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions) throws OptimizationFailureException {
        this.ensureUtilizationUnderCapacity(clusterModel, optimizationOptions);
        GoalUtils.ensureNoOfflineReplicas(clusterModel, this.name());
        GoalUtils.ensureReplicasMoveOffBrokersWithBadDisks(clusterModel, this.name());
        this.finish();
    }

    @Override
    public void finish() {
        this._finished = true;
    }

    private void ensureUtilizationUnderCapacity(ClusterModel clusterModel, OptimizationOptions optimizationOptions) throws OptimizationFailureException {
        Resource resource = this.resource();
        double capacityThreshold = this._balancingConstraint.capacityThreshold(resource);
        for (Broker broker : clusterModel.brokers()) {
            double capacityLimit;
            double utilization;
            if (resource.isHostResource()) {
                utilization = broker.host().load().expectedUtilizationFor(resource);
                capacityLimit = broker.host().capacityFor(resource) * capacityThreshold;
                if (!broker.host().replicas().isEmpty() && utilization > capacityLimit) {
                    String mitigation = GoalUtils.mitigationForOptimizationFailures(optimizationOptions);
                    throw new OptimizationFailureException(String.format("Optimization for goal %s failed because %s utilization for host %s is %f which is above capacity limit %f. %s", new Object[]{this.name(), resource, broker.host().name(), utilization, capacityLimit, mitigation}));
                }
            }
            if (!resource.isBrokerResource()) continue;
            utilization = broker.load().expectedUtilizationFor(resource);
            capacityLimit = broker.capacityFor(resource) * capacityThreshold;
            if (broker.replicas().isEmpty() || !(utilization > capacityLimit)) continue;
            String mitigation = GoalUtils.mitigationForOptimizationFailures(optimizationOptions);
            throw new OptimizationFailureException(String.format("Optimization for goal %s failed because %s utilization for broker %d is %f which is above capacity limit %f. %s", new Object[]{this.name(), resource, broker.id(), utilization, capacityLimit, mitigation}));
        }
    }

    @Override
    protected void rebalanceForBroker(Broker broker, ClusterModel clusterModel, Set<Goal> optimizedGoals, OptimizationOptions optimizationOptions) throws OptimizationFailureException {
        LOG.debug("balancing broker {}, optimized goals = {}", (Object)broker, optimizedGoals);
        Resource currentResource = this.resource();
        double capacityThreshold = this._balancingConstraint.capacityThreshold(currentResource);
        double brokerCapacityLimit = broker.capacityFor(currentResource) * capacityThreshold;
        double hostCapacityLimit = broker.host().capacityFor(currentResource) * capacityThreshold;
        boolean isUtilizationOverLimit = this.isUtilizationOverLimit(broker, currentResource, brokerCapacityLimit, hostCapacityLimit);
        if (!isUtilizationOverLimit && broker.currentOfflineReplicas().isEmpty()) {
            return;
        }
        if (currentResource == Resource.NW_OUT || currentResource == Resource.CPU) {
            for (Replica leader : broker.trackedSortedReplicas(GoalUtils.replicaSortName(this, true, true)).sortedReplicas(true)) {
                List<Replica> onlineFollowers = clusterModel.partition(leader.topicPartition()).onlineFollowers();
                GoalUtils.sortReplicasInAscendingOrderByBrokerResourceUtilization(onlineFollowers, currentResource);
                List<Broker> eligibleBrokers = onlineFollowers.stream().map(Replica::broker).collect(Collectors.toList());
                Broker b = this.maybeApplyBalancingAction(clusterModel, leader, eligibleBrokers, ActionType.LEADERSHIP_MOVEMENT, optimizedGoals, optimizationOptions);
                if (b == null) {
                    LOG.debug("Failed to move leader replica {} to any other brokers in {}", (Object)leader, eligibleBrokers);
                }
                if (isUtilizationOverLimit = this.isUtilizationOverLimit(broker, currentResource, brokerCapacityLimit, hostCapacityLimit)) continue;
                break;
            }
        }
        if (isUtilizationOverLimit || !broker.currentOfflineReplicas().isEmpty()) {
            List<Broker> sortedAliveBrokersUnderCapacityLimit = clusterModel.sortedAliveBrokersUnderThreshold(currentResource, capacityThreshold);
            for (Replica replica : broker.trackedSortedReplicas(GoalUtils.replicaSortName(this, true, false)).sortedReplicas(true)) {
                Broker b = this.maybeApplyBalancingAction(clusterModel, replica, sortedAliveBrokersUnderCapacityLimit, ActionType.INTER_BROKER_REPLICA_MOVEMENT, optimizedGoals, optimizationOptions);
                if (b == null) {
                    LOG.debug("Failed to move replica {} to any broker in {}", (Object)replica, sortedAliveBrokersUnderCapacityLimit);
                }
                if ((isUtilizationOverLimit = this.isUtilizationOverLimit(broker, currentResource, brokerCapacityLimit, hostCapacityLimit)) || !broker.currentOfflineReplicas().isEmpty()) continue;
                break;
            }
        }
        this.postSanityCheck(isUtilizationOverLimit, broker, brokerCapacityLimit, hostCapacityLimit, optimizationOptions);
    }

    private void postSanityCheck(boolean utilizationOverLimit, Broker broker, double brokerCapacityLimit, double hostCapacityLimit, OptimizationOptions optimizationOptions) throws OptimizationFailureException {
        if (utilizationOverLimit) {
            Resource currentResource = this.resource();
            String mitigation = GoalUtils.mitigationForOptimizationFailures(optimizationOptions);
            if (!currentResource.isHostResource()) {
                throw new OptimizationFailureException(String.format("[%s] Violated capacity limit of %f via broker utilization of %f with broker %d for resource %s. %s", new Object[]{this.name(), brokerCapacityLimit, broker.load().expectedUtilizationFor(currentResource), broker.id(), currentResource, mitigation}));
            }
            throw new OptimizationFailureException(String.format("[%s] Violated capacity limit of %f via host utilization of %f with hostname %s for resource %s. %s", new Object[]{this.name(), hostCapacityLimit, broker.host().load().expectedUtilizationFor(currentResource), broker.host().name(), currentResource, mitigation}));
        }
        if (!broker.currentOfflineReplicas().isEmpty()) {
            throw new OptimizationFailureException("Failed to remove offline replicas from broker " + broker.id() + ".");
        }
    }

    private boolean isUtilizationOverLimit(Broker broker, Resource resource, double brokerCapacityLimit, double hostCapacityLimit) {
        double utilization;
        if (!broker.host().replicas().isEmpty() && resource.isHostResource() && (utilization = broker.host().load().expectedUtilizationFor(resource)) > hostCapacityLimit) {
            return true;
        }
        if (!broker.replicas().isEmpty() && resource.isBrokerResource()) {
            utilization = broker.load().expectedUtilizationFor(resource);
            return utilization > brokerCapacityLimit;
        }
        return false;
    }

    private boolean isMovementAcceptableForCapacity(Replica sourceReplica, Broker destinationBroker) {
        double replicaUtilization = sourceReplica.load().expectedUtilizationFor(this.resource());
        return this.isUtilizationUnderLimitAfterAddingLoad(destinationBroker, replicaUtilization);
    }

    private boolean isSwapAcceptableForCapacity(Replica sourceReplica, Replica destinationReplica) {
        double sourceReplicaUtilization = sourceReplica.load().expectedUtilizationFor(this.resource());
        double destinationReplicaUtilization = destinationReplica.load().expectedUtilizationFor(this.resource());
        double sourceUtilizationDelta = destinationReplicaUtilization - sourceReplicaUtilization;
        return sourceUtilizationDelta > 0.0 ? this.isUtilizationUnderLimitAfterAddingLoad(sourceReplica.broker(), sourceUtilizationDelta) : this.isUtilizationUnderLimitAfterAddingLoad(destinationReplica.broker(), -sourceUtilizationDelta);
    }

    private boolean isUtilizationUnderLimitAfterAddingLoad(Broker destinationBroker, double replicaUtilization) {
        double capacityLimit;
        double utilization;
        Resource resource = this.resource();
        double capacityThreshold = this._balancingConstraint.capacityThreshold(resource);
        if (resource.isHostResource() && (utilization = destinationBroker.host().load().expectedUtilizationFor(resource)) + replicaUtilization >= (capacityLimit = destinationBroker.host().capacityFor(resource) * capacityThreshold)) {
            return false;
        }
        if (resource.isBrokerResource()) {
            utilization = destinationBroker.load().expectedUtilizationFor(resource);
            return utilization + replicaUtilization < (capacityLimit = destinationBroker.capacityFor(resource) * capacityThreshold);
        }
        return true;
    }
}

