/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.analyzer.goals;

import com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance;
import com.linkedin.kafka.cruisecontrol.analyzer.ActionType;
import com.linkedin.kafka.cruisecontrol.analyzer.AnalyzerUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingConstraint;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalUtils;
import com.linkedin.kafka.cruisecontrol.common.Resource;
import com.linkedin.kafka.cruisecontrol.common.Statistic;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.ClusterModelStats;
import com.linkedin.kafka.cruisecontrol.model.Replica;
import com.linkedin.kafka.cruisecontrol.model.ReplicaSortFunctionFactory;
import com.linkedin.kafka.cruisecontrol.model.SortedReplicasHelper;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LeaderBytesInDistributionGoal
extends AbstractGoal {
    private static final Logger LOG = LoggerFactory.getLogger(LeaderBytesInDistributionGoal.class);
    private double _meanLeaderBytesIn;
    private Set<Integer> _overLimitBrokerIds;

    public LeaderBytesInDistributionGoal() {
    }

    LeaderBytesInDistributionGoal(BalancingConstraint balancingConstraint) {
        this._balancingConstraint = balancingConstraint;
    }

    @Override
    public ActionAcceptance actionAcceptance(BalancingAction action, ClusterModel clusterModel) {
        double newDestLeaderBytesIn;
        Replica sourceReplica = clusterModel.broker(action.sourceBrokerId()).replica(action.topicPartition());
        Broker destinationBroker = clusterModel.broker(action.destinationBrokerId());
        this.initMeanLeaderBytesIn(clusterModel);
        if (!sourceReplica.isLeader()) {
            switch (action.balancingAction()) {
                case INTER_BROKER_REPLICA_SWAP: {
                    if (destinationBroker.replica(action.destinationTopicPartition()).isLeader()) break;
                    return ActionAcceptance.ACCEPT;
                }
                case INTER_BROKER_REPLICA_MOVEMENT: {
                    return ActionAcceptance.ACCEPT;
                }
                case LEADERSHIP_MOVEMENT: {
                    throw new IllegalStateException("Attempt to move leadership from the follower.");
                }
                default: {
                    throw new IllegalArgumentException("Unsupported balancing action " + action.balancingAction() + " is provided.");
                }
            }
        }
        double sourceReplicaUtilization = sourceReplica.load().expectedUtilizationFor(Resource.NW_IN);
        switch (action.balancingAction()) {
            case INTER_BROKER_REPLICA_SWAP: {
                double destinationReplicaUtilization = destinationBroker.replica(action.destinationTopicPartition()).load().expectedUtilizationFor(Resource.NW_IN);
                newDestLeaderBytesIn = destinationBroker.leadershipLoadForNwResources().expectedUtilizationFor(Resource.NW_IN) + sourceReplicaUtilization - destinationReplicaUtilization;
                Broker sourceBroker = clusterModel.broker(action.sourceBrokerId());
                double newSourceLeaderBytesIn = sourceBroker.leadershipLoadForNwResources().expectedUtilizationFor(Resource.NW_IN) + destinationReplicaUtilization - sourceReplicaUtilization;
                if (!(newSourceLeaderBytesIn > this.balanceThreshold(clusterModel, sourceBroker.id()))) break;
                return ActionAcceptance.REPLICA_REJECT;
            }
            case INTER_BROKER_REPLICA_MOVEMENT: 
            case LEADERSHIP_MOVEMENT: {
                newDestLeaderBytesIn = destinationBroker.leadershipLoadForNwResources().expectedUtilizationFor(Resource.NW_IN) + sourceReplicaUtilization;
                break;
            }
            default: {
                throw new IllegalArgumentException("Unsupported balancing action " + action.balancingAction() + " is provided.");
            }
        }
        return !(newDestLeaderBytesIn > this.balanceThreshold(clusterModel, destinationBroker.id())) ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT;
    }

    @Override
    public Goal.ClusterModelStatsComparator clusterModelStatsComparator() {
        return new LeaderBytesInDistributionGoalStatsComparator();
    }

    @Override
    public ModelCompletenessRequirements clusterModelCompletenessRequirements() {
        return new ModelCompletenessRequirements(Math.max(1, this._numWindows / 14), this._minMonitoredPartitionPercentage, false);
    }

    @Override
    public String name() {
        return LeaderBytesInDistributionGoal.class.getSimpleName();
    }

    @Override
    public boolean isHardGoal() {
        return false;
    }

    @Override
    protected SortedSet<Broker> brokersToBalance(ClusterModel clusterModel) {
        SortedSet<Broker> brokersToBalance = clusterModel.brokers();
        Iterator iterator = brokersToBalance.iterator();
        while (iterator.hasNext()) {
            Broker broker = (Broker)iterator.next();
            double brokerUtilizationForNwIn = broker.leadershipLoadForNwResources().expectedUtilizationFor(Resource.NW_IN);
            if (!(brokerUtilizationForNwIn <= this.balanceThreshold(clusterModel, broker.id()))) continue;
            iterator.remove();
        }
        return brokersToBalance;
    }

    @Override
    protected boolean selfSatisfied(ClusterModel clusterModel, BalancingAction action) {
        if (action.balancingAction() != ActionType.LEADERSHIP_MOVEMENT) {
            throw new IllegalStateException("Found balancing action " + action.balancingAction() + " but expected leadership movement.");
        }
        return this.actionAcceptance(action, clusterModel) == ActionAcceptance.ACCEPT;
    }

    @Override
    protected void initGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions) {
        this._meanLeaderBytesIn = 0.0;
        this._overLimitBrokerIds = new HashSet<Integer>();
        Set<String> excludedTopics = optimizationOptions.excludedTopics();
        new SortedReplicasHelper().addSelectionFunc(ReplicaSortFunctionFactory.selectLeaders()).addSelectionFunc(ReplicaSortFunctionFactory.selectReplicasBasedOnExcludedTopics(excludedTopics)).setScoreFunc(ReplicaSortFunctionFactory.reverseSortByMetricGroupValue(Resource.NW_IN.toString())).trackSortedReplicasFor(GoalUtils.replicaSortName(this, true, true), clusterModel);
    }

    @Override
    public void finish() {
        this._finished = true;
        this._overLimitBrokerIds.clear();
    }

    @Override
    protected void updateGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions) {
        if (!this._overLimitBrokerIds.isEmpty()) {
            LOG.warn("There were still {} brokers over the limit.", (Object)this._overLimitBrokerIds.size());
            this._succeeded = false;
        }
        this.finish();
    }

    @Override
    protected void rebalanceForBroker(Broker broker, ClusterModel clusterModel, Set<Goal> optimizedGoals, OptimizationOptions optimizationOptions) {
        double balanceThreshold = this.balanceThreshold(clusterModel, broker.id());
        if (broker.leadershipLoadForNwResources().expectedUtilizationFor(Resource.NW_IN) < balanceThreshold) {
            return;
        }
        boolean overThreshold = true;
        Iterator leaderReplicaIt = broker.trackedSortedReplicas(GoalUtils.replicaSortName(this, true, true)).sortedReplicas(true).iterator();
        while (overThreshold && leaderReplicaIt.hasNext()) {
            Replica leaderReplica = (Replica)leaderReplicaIt.next();
            List<Replica> onlineFollowers = clusterModel.partition(leaderReplica.topicPartition()).onlineFollowers();
            List<Broker> eligibleBrokers = onlineFollowers.stream().map(Replica::broker).sorted(Comparator.comparingDouble(a -> a.leadershipLoadForNwResources().expectedUtilizationFor(Resource.NW_IN))).collect(Collectors.toList());
            this.maybeApplyBalancingAction(clusterModel, leaderReplica, eligibleBrokers, ActionType.LEADERSHIP_MOVEMENT, optimizedGoals, optimizationOptions);
            overThreshold = broker.leadershipLoadForNwResources().expectedUtilizationFor(Resource.NW_IN) > balanceThreshold;
        }
        if (overThreshold) {
            this._overLimitBrokerIds.add(broker.id());
        }
    }

    private void initMeanLeaderBytesIn(ClusterModel clusterModel) {
        if (this._meanLeaderBytesIn == 0.0) {
            this._meanLeaderBytesIn = LeaderBytesInDistributionGoal.meanLeaderResourceUtilization(clusterModel.brokers(), Resource.NW_IN);
        }
    }

    private static double meanLeaderResourceUtilization(Collection<Broker> brokers, Resource resource) {
        double accumulator = 0.0;
        int brokerCount = 0;
        for (Broker broker : brokers) {
            if (!broker.isAlive()) continue;
            accumulator += broker.leadershipLoadForNwResources().expectedUtilizationFor(resource);
            ++brokerCount;
        }
        return accumulator / (double)brokerCount;
    }

    private double balanceThreshold(ClusterModel clusterModel, int brokerId) {
        this.initMeanLeaderBytesIn(clusterModel);
        double lowUtilizationThreshold = this._balancingConstraint.lowUtilizationThreshold(Resource.NW_IN) * clusterModel.broker(brokerId).capacityFor(Resource.NW_IN);
        return Math.max(this._meanLeaderBytesIn * this._balancingConstraint.resourceBalancePercentage(Resource.NW_IN), lowUtilizationThreshold);
    }

    private class LeaderBytesInDistributionGoalStatsComparator
    implements Goal.ClusterModelStatsComparator {
        private String _reasonForLastNegativeResult;

        private LeaderBytesInDistributionGoalStatsComparator() {
        }

        @Override
        public int compare(ClusterModelStats stats1, ClusterModelStats stats2) {
            double meanPreLeaderBytesIn = stats1.resourceUtilizationStats().get((Object)Statistic.AVG).get((Object)Resource.NW_IN);
            double threshold = meanPreLeaderBytesIn * LeaderBytesInDistributionGoal.this._balancingConstraint.resourceBalancePercentage(Resource.NW_IN);
            if (stats1.resourceUtilizationStats().get((Object)Statistic.MAX).get((Object)Resource.NW_IN) <= threshold) {
                return 1;
            }
            double variance1 = stats1.resourceUtilizationStats().get((Object)Statistic.ST_DEV).get((Object)Resource.NW_IN);
            double variance2 = stats2.resourceUtilizationStats().get((Object)Statistic.ST_DEV).get((Object)Resource.NW_IN);
            int result = AnalyzerUtils.compare(Math.sqrt(variance2), Math.sqrt(variance1), Resource.NW_IN);
            if (result < 0) {
                this._reasonForLastNegativeResult = String.format("Violated leader bytes in balancing. preVariance: %.3f postVariance: %.3f.", variance2, variance1);
            }
            return result;
        }

        @Override
        public String explainLastComparison() {
            return this._reasonForLastNegativeResult;
        }
    }
}

