/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.analyzer.goals;

import com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance;
import com.linkedin.kafka.cruisecontrol.analyzer.ActionType;
import com.linkedin.kafka.cruisecontrol.analyzer.AnalyzerUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingConstraint;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionAbstractGoal;
import com.linkedin.kafka.cruisecontrol.common.Resource;
import com.linkedin.kafka.cruisecontrol.common.Statistic;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.ClusterModelStats;
import com.linkedin.kafka.cruisecontrol.model.Replica;
import com.linkedin.kafka.cruisecontrol.model.ReplicaSortFunctionFactory;
import com.linkedin.kafka.cruisecontrol.model.SortedReplicasHelper;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReplicaDistributionGoal
extends ReplicaDistributionAbstractGoal {
    private static final Logger LOG = LoggerFactory.getLogger(ReplicaDistributionGoal.class);

    public ReplicaDistributionGoal() {
    }

    public ReplicaDistributionGoal(BalancingConstraint balancingConstraint) {
        this();
        this._balancingConstraint = balancingConstraint;
    }

    @Override
    int numInterestedReplicas(ClusterModel clusterModel) {
        return clusterModel.numReplicas();
    }

    @Override
    double balancePercentage() {
        return this._balancingConstraint.replicaBalancePercentage();
    }

    @Override
    public ActionAcceptance actionAcceptance(BalancingAction action, ClusterModel clusterModel) {
        switch (action.balancingAction()) {
            case INTER_BROKER_REPLICA_SWAP: 
            case LEADERSHIP_MOVEMENT: {
                return ActionAcceptance.ACCEPT;
            }
            case INTER_BROKER_REPLICA_MOVEMENT: {
                Broker sourceBroker = clusterModel.broker(action.sourceBrokerId());
                Broker destinationBroker = clusterModel.broker(action.destinationBrokerId());
                return this.isReplicaCountUnderBalanceUpperLimitAfterChange(destinationBroker, destinationBroker.replicas().size(), ReplicaDistributionAbstractGoal.ChangeType.ADD) && this.isReplicaCountAboveBalanceLowerLimitAfterChange(sourceBroker, sourceBroker.replicas().size(), ReplicaDistributionAbstractGoal.ChangeType.REMOVE) ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT;
            }
        }
        throw new IllegalArgumentException("Unsupported balancing action " + action.balancingAction() + " is provided.");
    }

    @Override
    public Goal.ClusterModelStatsComparator clusterModelStatsComparator() {
        return new ReplicaDistributionGoalStatsComparator();
    }

    @Override
    public String name() {
        return ReplicaDistributionGoal.class.getSimpleName();
    }

    @Override
    protected void initGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions) {
        super.initGoalState(clusterModel, optimizationOptions);
        Set<String> excludedTopics = optimizationOptions.excludedTopics();
        for (Broker broker : clusterModel.brokers()) {
            new SortedReplicasHelper().maybeAddSelectionFunc(ReplicaSortFunctionFactory.selectImmigrants(), optimizationOptions.onlyMoveImmigrantReplicas()).maybeAddSelectionFunc(ReplicaSortFunctionFactory.selectImmigrantOrOfflineReplicas(), !clusterModel.selfHealingEligibleReplicas().isEmpty() && broker.isAlive()).addSelectionFunc(ReplicaSortFunctionFactory.selectReplicasBasedOnExcludedTopics(excludedTopics)).addPriorityFunc(ReplicaSortFunctionFactory.prioritizeOfflineReplicas()).maybeAddPriorityFunc(ReplicaSortFunctionFactory.prioritizeImmigrants(), !optimizationOptions.onlyMoveImmigrantReplicas()).setScoreFunc(ReplicaSortFunctionFactory.sortByMetricGroupValue(Resource.DISK.name())).trackSortedReplicasFor(GoalUtils.replicaSortName(this, false, false), broker);
        }
    }

    @Override
    protected void rebalanceForBroker(Broker broker, ClusterModel clusterModel, Set<Goal> optimizedGoals, OptimizationOptions optimizationOptions) {
        boolean requireMoreReplicas;
        LOG.debug("Rebalancing broker {} [limits] lower: {} upper: {}.", new Object[]{broker.id(), this._balanceLowerLimit, this._balanceUpperLimit});
        int numReplicas = broker.replicas().size();
        int numOfflineReplicas = broker.currentOfflineReplicas().size();
        boolean requireLessReplicas = numOfflineReplicas > 0 || numReplicas > this._balanceUpperLimit;
        boolean bl = requireMoreReplicas = broker.isAlive() && numReplicas - numOfflineReplicas < this._balanceLowerLimit;
        if (broker.isAlive() && !requireMoreReplicas && !requireLessReplicas) {
            return;
        }
        if (!(clusterModel.newBrokers().isEmpty() || broker.isNew() || requireLessReplicas)) {
            return;
        }
        if ((!clusterModel.selfHealingEligibleReplicas().isEmpty() && broker.currentOfflineReplicas().isEmpty() || optimizationOptions.onlyMoveImmigrantReplicas()) && requireLessReplicas && broker.immigrantReplicas().isEmpty()) {
            return;
        }
        if (requireLessReplicas && this.rebalanceByMovingReplicasOut(broker, clusterModel, optimizedGoals, optimizationOptions)) {
            this._brokerIdsAboveBalanceUpperLimit.add(broker.id());
            LOG.debug("Failed to sufficiently decrease replica count in broker {} with replica movements. Replicas: {}.", (Object)broker.id(), (Object)broker.replicas().size());
        }
        if (requireMoreReplicas && this.rebalanceByMovingReplicasIn(broker, clusterModel, optimizedGoals, optimizationOptions)) {
            this._brokerIdsUnderBalanceLowerLimit.add(broker.id());
            LOG.debug("Failed to sufficiently increase replica count in broker {} with replica movements. Replicas: {}.", (Object)broker.id(), (Object)broker.replicas().size());
        }
        if (!this._brokerIdsAboveBalanceUpperLimit.contains(broker.id()) && !this._brokerIdsUnderBalanceLowerLimit.contains(broker.id())) {
            LOG.debug("Successfully balanced replica count for broker {} by moving replicas. Replicas: {}", (Object)broker.id(), (Object)broker.replicas().size());
        }
    }

    private boolean rebalanceByMovingReplicasOut(Broker broker, ClusterModel clusterModel, Set<Goal> optimizedGoals, OptimizationOptions optimizationOptions) {
        TreeSet<Broker> candidateBrokers = new TreeSet<Broker>(Comparator.comparingInt(b -> b.replicas().size()).thenComparingInt(Broker::id));
        candidateBrokers.addAll(this._fixOfflineReplicasOnly ? clusterModel.aliveBrokers() : (Collection)clusterModel.aliveBrokers().stream().filter(b -> b.replicas().size() < this._balanceUpperLimit).collect(Collectors.toSet()));
        boolean wasUnableToMoveOfflineReplica = false;
        for (Replica replica : broker.trackedSortedReplicas(GoalUtils.replicaSortName(this, false, false)).sortedReplicas(true)) {
            if (wasUnableToMoveOfflineReplica && !replica.isCurrentOffline() && broker.replicas().size() <= this._balanceUpperLimit) {
                return false;
            }
            Broker b2 = this.maybeApplyBalancingAction(clusterModel, replica, candidateBrokers, ActionType.INTER_BROKER_REPLICA_MOVEMENT, optimizedGoals, optimizationOptions);
            if (b2 != null) {
                if (broker.replicas().size() <= (broker.currentOfflineReplicas().isEmpty() ? this._balanceUpperLimit : 0)) {
                    return false;
                }
                candidateBrokers.remove(b2);
                if (b2.replicas().size() >= this._balanceUpperLimit && !this._fixOfflineReplicasOnly) continue;
                candidateBrokers.add(b2);
                continue;
            }
            if (!replica.isCurrentOffline()) continue;
            wasUnableToMoveOfflineReplica = true;
        }
        return !broker.replicas().isEmpty();
    }

    private boolean rebalanceByMovingReplicasIn(Broker aliveDestBroker, ClusterModel clusterModel, Set<Goal> optimizedGoals, OptimizationOptions optimizationOptions) {
        PriorityQueue<Broker> eligibleBrokers = new PriorityQueue<Broker>((b1, b2) -> {
            int resultByOfflineReplicas = Integer.compare(b2.currentOfflineReplicas().size(), b1.currentOfflineReplicas().size());
            if (resultByOfflineReplicas == 0) {
                int resultByAllReplicas = Integer.compare(b2.replicas().size(), b1.replicas().size());
                return resultByAllReplicas == 0 ? Integer.compare(b1.id(), b2.id()) : resultByAllReplicas;
            }
            return resultByOfflineReplicas;
        });
        if (this._fixOfflineReplicasOnly) {
            clusterModel.brokers().stream().filter(sourceBroker -> sourceBroker.id() != aliveDestBroker.id()).forEach(eligibleBrokers::add);
        } else {
            for (Broker sourceBroker2 : clusterModel.brokers()) {
                if (sourceBroker2.replicas().size() <= this._balanceLowerLimit && sourceBroker2.currentOfflineReplicas().isEmpty()) continue;
                eligibleBrokers.add(sourceBroker2);
            }
        }
        List<Broker> candidateBrokers = Collections.singletonList(aliveDestBroker);
        block1: while (!eligibleBrokers.isEmpty()) {
            Broker sourceBroker2;
            sourceBroker2 = (Broker)eligibleBrokers.poll();
            for (Replica replica : sourceBroker2.trackedSortedReplicas(GoalUtils.replicaSortName(this, false, false)).sortedReplicas(true)) {
                int result;
                Broker b = this.maybeApplyBalancingAction(clusterModel, replica, candidateBrokers, ActionType.INTER_BROKER_REPLICA_MOVEMENT, optimizedGoals, optimizationOptions);
                if (b == null) continue;
                if (aliveDestBroker.replicas().size() >= this._balanceLowerLimit) {
                    return false;
                }
                if (eligibleBrokers.isEmpty() || (result = Integer.compare(sourceBroker2.currentOfflineReplicas().size(), ((Broker)eligibleBrokers.peek()).currentOfflineReplicas().size())) != -1 && (result != 0 || sourceBroker2.replicas().size() >= ((Broker)eligibleBrokers.peek()).replicas().size())) continue;
                eligibleBrokers.add(sourceBroker2);
                continue block1;
            }
        }
        return true;
    }

    private class ReplicaDistributionGoalStatsComparator
    implements Goal.ClusterModelStatsComparator {
        private String _reasonForLastNegativeResult;

        private ReplicaDistributionGoalStatsComparator() {
        }

        @Override
        public int compare(ClusterModelStats stats1, ClusterModelStats stats2) {
            double stDev1 = stats1.replicaStats().get((Object)Statistic.ST_DEV).doubleValue();
            double stDev2 = stats2.replicaStats().get((Object)Statistic.ST_DEV).doubleValue();
            int result = AnalyzerUtils.compare(stDev2, stDev1, 1.0E-5);
            if (result < 0) {
                this._reasonForLastNegativeResult = String.format("Violated %s. [Std Deviation of Replica Distribution] post-optimization:%.3f pre-optimization:%.3f", ReplicaDistributionGoal.this.name(), stDev1, stDev2);
            }
            return result;
        }

        @Override
        public String explainLastComparison() {
            return this._reasonForLastNegativeResult;
        }
    }
}

