/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.analyzer.goals;

import com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance;
import com.linkedin.kafka.cruisecontrol.analyzer.ActionType;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingConstraint;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalUtils;
import com.linkedin.kafka.cruisecontrol.common.Resource;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.ClusterModelStats;
import com.linkedin.kafka.cruisecontrol.model.Replica;
import com.linkedin.kafka.cruisecontrol.model.ReplicaSortFunctionFactory;
import com.linkedin.kafka.cruisecontrol.model.SortedReplicasHelper;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import java.util.SortedSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PotentialNwOutGoal
extends AbstractGoal {
    private static final Logger LOG = LoggerFactory.getLogger(PotentialNwOutGoal.class);
    private boolean _fixOfflineReplicasOnly;

    public PotentialNwOutGoal() {
    }

    PotentialNwOutGoal(BalancingConstraint constraint) {
        this._balancingConstraint = constraint;
    }

    @Override
    public ActionAcceptance actionAcceptance(BalancingAction action, ClusterModel clusterModel) {
        switch (action.balancingAction()) {
            case LEADERSHIP_MOVEMENT: {
                return ActionAcceptance.ACCEPT;
            }
            case INTER_BROKER_REPLICA_SWAP: 
            case INTER_BROKER_REPLICA_MOVEMENT: {
                return this.isReplicaRelocationAcceptable(action, clusterModel);
            }
        }
        throw new IllegalArgumentException("Unsupported balancing action " + action.balancingAction() + " is provided.");
    }

    private ActionAcceptance isReplicaRelocationAcceptable(BalancingAction action, ClusterModel clusterModel) {
        if (this.selfSatisfied(clusterModel, action)) {
            return ActionAcceptance.ACCEPT;
        }
        Replica replica = clusterModel.broker(action.sourceBrokerId()).replica(action.topicPartition());
        double destinationBrokerUtilization = clusterModel.potentialLeadershipLoadFor(clusterModel.broker(action.destinationBrokerId()).id()).expectedUtilizationFor(Resource.NW_OUT);
        double sourceBrokerUtilization = clusterModel.potentialLeadershipLoadFor(replica.broker().id()).expectedUtilizationFor(Resource.NW_OUT);
        double sourceReplicaUtilization = clusterModel.partition(replica.topicPartition()).leader().load().expectedUtilizationFor(Resource.NW_OUT);
        double maxUtilization = Math.max(destinationBrokerUtilization, sourceBrokerUtilization);
        switch (action.balancingAction()) {
            case INTER_BROKER_REPLICA_SWAP: {
                double destinationReplicaUtilization = clusterModel.partition(action.destinationTopicPartition()).leader().load().expectedUtilizationFor(Resource.NW_OUT);
                if (sourceBrokerUtilization + destinationReplicaUtilization - sourceReplicaUtilization > maxUtilization) {
                    return ActionAcceptance.REPLICA_REJECT;
                }
                return destinationBrokerUtilization + sourceReplicaUtilization - destinationReplicaUtilization <= maxUtilization ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT;
            }
            case INTER_BROKER_REPLICA_MOVEMENT: {
                return destinationBrokerUtilization + sourceReplicaUtilization <= maxUtilization ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT;
            }
        }
        throw new IllegalArgumentException("Unsupported balancing action " + action.balancingAction() + " is provided.");
    }

    @Override
    public Goal.ClusterModelStatsComparator clusterModelStatsComparator() {
        return new PotentialNwOutGoalStatsComparator();
    }

    @Override
    public ModelCompletenessRequirements clusterModelCompletenessRequirements() {
        return new ModelCompletenessRequirements(Math.max(1, this._numWindows / 14), this._minMonitoredPartitionPercentage, false);
    }

    @Override
    public String name() {
        return PotentialNwOutGoal.class.getSimpleName();
    }

    @Override
    public boolean isHardGoal() {
        return false;
    }

    @Override
    protected SortedSet<Broker> brokersToBalance(ClusterModel clusterModel) {
        SortedSet<Broker> brokenBrokers = clusterModel.deadBrokers();
        brokenBrokers.addAll(clusterModel.brokersHavingOfflineReplicasOnBadDisks());
        return brokenBrokers.isEmpty() ? clusterModel.brokers() : brokenBrokers;
    }

    @Override
    protected boolean selfSatisfied(ClusterModel clusterModel, BalancingAction action) {
        Replica sourceReplica = clusterModel.broker(action.sourceBrokerId()).replica(action.topicPartition());
        ActionType actionType = action.balancingAction();
        Broker sourceBroker = sourceReplica.broker();
        if (this._fixOfflineReplicasOnly && sourceReplica.isCurrentOffline()) {
            return action.balancingAction() == ActionType.INTER_BROKER_REPLICA_MOVEMENT;
        }
        Broker destinationBroker = clusterModel.broker(action.destinationBrokerId());
        double destinationBrokerUtilization = clusterModel.potentialLeadershipLoadFor(destinationBroker.id()).expectedUtilizationFor(Resource.NW_OUT);
        double destinationCapacity = destinationBroker.capacityFor(Resource.NW_OUT) * this._balancingConstraint.capacityThreshold(Resource.NW_OUT);
        double sourceReplicaUtilization = clusterModel.partition(sourceReplica.topicPartition()).leader().load().expectedUtilizationFor(Resource.NW_OUT);
        if (actionType != ActionType.INTER_BROKER_REPLICA_SWAP) {
            return destinationCapacity >= destinationBrokerUtilization + sourceReplicaUtilization;
        }
        double destinationReplicaUtilization = clusterModel.partition(action.destinationTopicPartition()).leader().load().expectedUtilizationFor(Resource.NW_OUT);
        if (destinationCapacity < destinationBrokerUtilization + sourceReplicaUtilization - destinationReplicaUtilization) {
            return false;
        }
        double sourceBrokerUtilization = clusterModel.potentialLeadershipLoadFor(sourceBroker.id()).expectedUtilizationFor(Resource.NW_OUT);
        double sourceCapacity = sourceBroker.capacityFor(Resource.NW_OUT) * this._balancingConstraint.capacityThreshold(Resource.NW_OUT);
        return sourceCapacity >= sourceBrokerUtilization + destinationReplicaUtilization - sourceReplicaUtilization;
    }

    @Override
    protected void initGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions) {
        this._fixOfflineReplicasOnly = false;
        new SortedReplicasHelper().maybeAddSelectionFunc(ReplicaSortFunctionFactory.selectImmigrants(), optimizationOptions.onlyMoveImmigrantReplicas()).addSelectionFunc(ReplicaSortFunctionFactory.selectReplicasBasedOnExcludedTopics(optimizationOptions.excludedTopics())).trackSortedReplicasFor(GoalUtils.replicaSortName(this, false, false), clusterModel);
    }

    @Override
    protected void updateGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions) throws OptimizationFailureException {
        try {
            GoalUtils.ensureNoOfflineReplicas(clusterModel, this.name());
        }
        catch (OptimizationFailureException ofe) {
            if (this._fixOfflineReplicasOnly) {
                throw ofe;
            }
            this._fixOfflineReplicasOnly = true;
            LOG.warn("Ignoring potential network outbound limit to move offline replicas from dead brokers/disks.");
            return;
        }
        GoalUtils.ensureReplicasMoveOffBrokersWithBadDisks(clusterModel, this.name());
        this.finish();
    }

    @Override
    public void finish() {
        this._finished = true;
    }

    @Override
    protected void rebalanceForBroker(Broker broker, ClusterModel clusterModel, Set<Goal> optimizedGoals, OptimizationOptions optimizationOptions) {
        boolean estimatedMaxPossibleNwOutOverLimit;
        double capacityThreshold = this._balancingConstraint.capacityThreshold(Resource.NW_OUT);
        double capacityLimit = broker.capacityFor(Resource.NW_OUT) * capacityThreshold;
        boolean bl = estimatedMaxPossibleNwOutOverLimit = !broker.replicas().isEmpty() && clusterModel.potentialLeadershipLoadFor(broker.id()).expectedUtilizationFor(Resource.NW_OUT) > capacityLimit;
        if (!(estimatedMaxPossibleNwOutOverLimit || this._fixOfflineReplicasOnly && !broker.currentOfflineReplicas().isEmpty())) {
            return;
        }
        Set<Broker> candidateBrokers = this._fixOfflineReplicasOnly ? clusterModel.aliveBrokers() : this.brokersUnderEstimatedMaxPossibleNwOut(clusterModel);
        for (Replica replica : broker.trackedSortedReplicas(GoalUtils.replicaSortName(this, false, false)).sortedReplicas(true)) {
            double destCapacityLimit;
            double updatedDestBrokerPotentialNwOut;
            ArrayList<Broker> eligibleBrokers = new ArrayList<Broker>(candidateBrokers);
            eligibleBrokers.removeAll(clusterModel.partition(replica.topicPartition()).partitionBrokers());
            eligibleBrokers.sort((b1, b2) -> Double.compare(b2.leadershipLoadForNwResources().expectedUtilizationFor(Resource.NW_OUT), b1.leadershipLoadForNwResources().expectedUtilizationFor(Resource.NW_OUT)));
            Broker destinationBroker = this.maybeApplyBalancingAction(clusterModel, replica, eligibleBrokers, ActionType.INTER_BROKER_REPLICA_MOVEMENT, optimizedGoals, optimizationOptions);
            if (destinationBroker == null) continue;
            int destinationBrokerId = destinationBroker.id();
            boolean bl2 = estimatedMaxPossibleNwOutOverLimit = !broker.replicas().isEmpty() && clusterModel.potentialLeadershipLoadFor(broker.id()).expectedUtilizationFor(Resource.NW_OUT) > capacityLimit;
            if (!estimatedMaxPossibleNwOutOverLimit && (!this._fixOfflineReplicasOnly || broker.currentOfflineReplicas().isEmpty())) break;
            if (this._fixOfflineReplicasOnly || !((updatedDestBrokerPotentialNwOut = clusterModel.potentialLeadershipLoadFor(destinationBrokerId).expectedUtilizationFor(Resource.NW_OUT)) > (destCapacityLimit = destinationBroker.capacityFor(Resource.NW_OUT) * capacityThreshold))) continue;
            candidateBrokers.remove(clusterModel.broker(destinationBrokerId));
        }
        if (estimatedMaxPossibleNwOutOverLimit) {
            LOG.warn("Violated estimated max possible network out limit for broker id:{} limit:{} utilization:{}.", new Object[]{broker.id(), capacityLimit, clusterModel.potentialLeadershipLoadFor(broker.id()).expectedUtilizationFor(Resource.NW_OUT)});
            this._succeeded = false;
        }
    }

    private Set<Broker> brokersUnderEstimatedMaxPossibleNwOut(ClusterModel clusterModel) {
        HashSet<Broker> brokersUnderEstimatedMaxPossibleNwOut = new HashSet<Broker>();
        double capacityThreshold = this._balancingConstraint.capacityThreshold(Resource.NW_OUT);
        for (Broker aliveBroker : clusterModel.aliveBrokers()) {
            double capacityLimit = aliveBroker.capacityFor(Resource.NW_OUT) * capacityThreshold;
            if (!(clusterModel.potentialLeadershipLoadFor(aliveBroker.id()).expectedUtilizationFor(Resource.NW_OUT) < capacityLimit)) continue;
            brokersUnderEstimatedMaxPossibleNwOut.add(aliveBroker);
        }
        return brokersUnderEstimatedMaxPossibleNwOut;
    }

    private class PotentialNwOutGoalStatsComparator
    implements Goal.ClusterModelStatsComparator {
        private String _reasonForLastNegativeResult;

        private PotentialNwOutGoalStatsComparator() {
        }

        @Override
        public int compare(ClusterModelStats stats1, ClusterModelStats stats2) {
            int stat2;
            int stat1 = stats1.numBrokersUnderPotentialNwOut();
            int result = Integer.compare(stat1, stat2 = stats2.numBrokersUnderPotentialNwOut());
            if (result < 0) {
                this._reasonForLastNegativeResult = String.format("Violated %s. [Number of brokers under potential NwOut] post-optimization:%d pre-optimization:%d", PotentialNwOutGoal.this.name(), stat1, stat2);
            }
            return result;
        }

        @Override
        public String explainLastComparison() {
            return this._reasonForLastNegativeResult;
        }
    }
}

