package com.linkedin.kafka.cruisecontrol.detector;

import com.linkedin.cruisecontrol.detector.Anomaly;
import com.linkedin.cruisecontrol.exception.NotEnoughValidWindowsException;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.AnalyzerUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptionsGenerator;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal;
import com.linkedin.kafka.cruisecontrol.async.progress.OperationProgress;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.config.constants.AnalyzerConfig;
import com.linkedin.kafka.cruisecontrol.config.constants.AnomalyDetectorConfig;
import com.linkedin.kafka.cruisecontrol.exception.KafkaCruiseControlException;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionProposal;
import com.linkedin.kafka.cruisecontrol.executor.ExecutorState;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.ReplicaPlacementInfo;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor;
import com.linkedin.kafka.cruisecontrol.monitor.ModelGeneration;
import com.linkedin.kafka.cruisecontrol.servlet.KafkaCruiseControlServletUtils;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/GoalViolationDetector.class */
public class GoalViolationDetector extends AbstractAnomalyDetector implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(GoalViolationDetector.class);
    private final List<Goal> _detectionGoals;
    private ModelGeneration _lastCheckedModelGeneration;
    private final Pattern _excludedTopics;
    private final boolean _allowCapacityEstimation;
    private final boolean _excludeRecentlyDemotedBrokers;
    private final boolean _excludeRecentlyRemovedBrokers;
    private final Map<String, Double> _balancednessCostByGoal;
    private volatile double _balancednessScore;
    private final OptimizationOptionsGenerator _optimizationOptionsGenerator;
    protected static final double BALANCEDNESS_SCORE_WITH_OFFLINE_REPLICAS = -1.0d;

    public GoalViolationDetector(Queue<Anomaly> queue, KafkaCruiseControl kafkaCruiseControl) {
        super(queue, kafkaCruiseControl);
        KafkaCruiseControlConfig config = this._kafkaCruiseControl.config();
        this._detectionGoals = config.getConfiguredInstances(AnomalyDetectorConfig.ANOMALY_DETECTION_GOALS_CONFIG, Goal.class);
        this._excludedTopics = Pattern.compile(config.getString(AnalyzerConfig.TOPICS_EXCLUDED_FROM_PARTITION_MOVEMENT_CONFIG));
        this._allowCapacityEstimation = config.getBoolean(AnomalyDetectorConfig.ANOMALY_DETECTION_ALLOW_CAPACITY_ESTIMATION_CONFIG).booleanValue();
        this._excludeRecentlyDemotedBrokers = config.getBoolean(AnomalyDetectorConfig.SELF_HEALING_EXCLUDE_RECENTLY_DEMOTED_BROKERS_CONFIG).booleanValue();
        this._excludeRecentlyRemovedBrokers = config.getBoolean(AnomalyDetectorConfig.SELF_HEALING_EXCLUDE_RECENTLY_REMOVED_BROKERS_CONFIG).booleanValue();
        this._balancednessCostByGoal = KafkaCruiseControlUtils.balancednessCostByGoal(this._detectionGoals, config.getDouble(AnalyzerConfig.GOAL_BALANCEDNESS_PRIORITY_WEIGHT_CONFIG).doubleValue(), config.getDouble(AnalyzerConfig.GOAL_BALANCEDNESS_STRICTNESS_WEIGHT_CONFIG).doubleValue());
        this._balancednessScore = 100.0d;
        HashMap hashMap = new HashMap(2);
        hashMap.put(KafkaCruiseControlServletUtils.KAFKA_CRUISE_CONTROL_CONFIG_OBJECT_CONFIG, config);
        hashMap.put(KafkaCruiseControlUtils.ADMIN_CLIENT_CONFIG, this._kafkaCruiseControl.adminClient());
        this._optimizationOptionsGenerator = (OptimizationOptionsGenerator) config.getConfiguredInstance(AnalyzerConfig.OPTIMIZATION_OPTIONS_GENERATOR_CLASS_CONFIG, OptimizationOptionsGenerator.class, hashMap);
    }

    public double balancednessScore() {
        return this._balancednessScore;
    }

    protected AnomalyDetectionStatus getGoalViolationDetectionStatus() {
        if (this._kafkaCruiseControl.loadMonitor().clusterModelGeneration().equals(this._lastCheckedModelGeneration)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Skipping goal violation detection because the model generation hasn't changed. Current model generation {}", this._kafkaCruiseControl.loadMonitor().clusterModelGeneration());
            }
            return AnomalyDetectionStatus.SKIP_MODEL_GENERATION_NOT_CHANGED;
        }
        AnomalyDetectionStatus anomalyDetectionStatus = AnomalyDetectorUtils.getAnomalyDetectionStatus(this._kafkaCruiseControl, true);
        if (anomalyDetectionStatus == AnomalyDetectionStatus.SKIP_HAS_OFFLINE_REPLICAS) {
            setBalancednessWithOfflineReplicas();
        }
        return anomalyDetectionStatus;
    }

    @Override // java.lang.Runnable
    public void run() {
        if (getGoalViolationDetectionStatus() != AnomalyDetectionStatus.READY) {
            return;
        }
        LoadMonitor.AutoCloseableSemaphore autoCloseableSemaphore = null;
        try {
            try {
                try {
                    try {
                        HashMap hashMap = new HashMap(2);
                        hashMap.put(AnomalyDetectorUtils.KAFKA_CRUISE_CONTROL_OBJECT_CONFIG, this._kafkaCruiseControl);
                        hashMap.put(AnomalyDetectorUtils.ANOMALY_DETECTION_TIME_MS_OBJECT_CONFIG, Long.valueOf(this._kafkaCruiseControl.timeMs()));
                        GoalViolations goalViolations = (GoalViolations) this._kafkaCruiseControl.config().getConfiguredInstance(AnomalyDetectorConfig.GOAL_VIOLATIONS_CLASS_CONFIG, GoalViolations.class, hashMap);
                        boolean z = true;
                        ClusterModel clusterModel = null;
                        ExecutorState executorState = (this._excludeRecentlyDemotedBrokers || this._excludeRecentlyRemovedBrokers) ? this._kafkaCruiseControl.executorState() : null;
                        Set<Integer> recentlyDemotedBrokers = this._excludeRecentlyDemotedBrokers ? executorState.recentlyDemotedBrokers() : Collections.emptySet();
                        Set<Integer> recentlyRemovedBrokers = this._excludeRecentlyRemovedBrokers ? executorState.recentlyRemovedBrokers() : Collections.emptySet();
                        for (Goal goal : this._detectionGoals) {
                            if (this._kafkaCruiseControl.loadMonitor().meetCompletenessRequirements(goal.clusterModelCompletenessRequirements())) {
                                LOG.debug("Detecting if {} is violated.", goal.name());
                                if (z) {
                                    if (autoCloseableSemaphore != null) {
                                        autoCloseableSemaphore.close();
                                    }
                                    autoCloseableSemaphore = this._kafkaCruiseControl.acquireForModelGeneration(new OperationProgress());
                                    clusterModel = this._kafkaCruiseControl.clusterModel(goal.clusterModelCompletenessRequirements(), this._allowCapacityEstimation, new OperationProgress());
                                    if (skipDueToOfflineReplicas(clusterModel)) {
                                        if (autoCloseableSemaphore != null) {
                                            try {
                                                autoCloseableSemaphore.close();
                                            } catch (Exception e) {
                                                LOG.error("Received exception when closing auto closable semaphore", e);
                                            }
                                        }
                                        LOG.debug("Goal violation detection finished.");
                                        return;
                                    }
                                    this._lastCheckedModelGeneration = clusterModel.generation();
                                }
                                z = optimizeForGoal(clusterModel, goal, goalViolations, recentlyDemotedBrokers, recentlyRemovedBrokers);
                            } else {
                                LOG.warn("Skipping goal violation detection for {} because load completeness requirement is not met.", goal);
                            }
                        }
                        Map<Boolean, List<String>> violatedGoalsByFixability = goalViolations.violatedGoalsByFixability();
                        if (!violatedGoalsByFixability.isEmpty()) {
                            this._anomalies.add(goalViolations);
                        }
                        refreshBalancednessScore(violatedGoalsByFixability);
                        if (autoCloseableSemaphore != null) {
                            try {
                                autoCloseableSemaphore.close();
                            } catch (Exception e2) {
                                LOG.error("Received exception when closing auto closable semaphore", e2);
                            }
                        }
                        LOG.debug("Goal violation detection finished.");
                    } catch (Exception e3) {
                        LOG.error("Unexpected exception", e3);
                        if (autoCloseableSemaphore != null) {
                            try {
                                autoCloseableSemaphore.close();
                            } catch (Exception e4) {
                                LOG.error("Received exception when closing auto closable semaphore", e4);
                            }
                        }
                        LOG.debug("Goal violation detection finished.");
                    }
                } catch (KafkaCruiseControlException e5) {
                    LOG.warn("Goal violation detector received exception", e5);
                    if (autoCloseableSemaphore != null) {
                        try {
                            autoCloseableSemaphore.close();
                        } catch (Exception e6) {
                            LOG.error("Received exception when closing auto closable semaphore", e6);
                        }
                    }
                    LOG.debug("Goal violation detection finished.");
                }
            } catch (NotEnoughValidWindowsException e7) {
                LOG.debug("Skipping goal violation detection because there are not enough valid windows.", e7);
                if (autoCloseableSemaphore != null) {
                    try {
                        autoCloseableSemaphore.close();
                    } catch (Exception e8) {
                        LOG.error("Received exception when closing auto closable semaphore", e8);
                    }
                }
                LOG.debug("Goal violation detection finished.");
            }
        } catch (Throwable th) {
            if (autoCloseableSemaphore != null) {
                try {
                    autoCloseableSemaphore.close();
                } catch (Exception e9) {
                    LOG.error("Received exception when closing auto closable semaphore", e9);
                }
            }
            LOG.debug("Goal violation detection finished.");
            throw th;
        }
    }

    protected boolean skipDueToOfflineReplicas(ClusterModel clusterModel) {
        if (!clusterModel.deadBrokers().isEmpty()) {
            LOG.info("Skipping goal violation detection due to dead brokers {}, which are reported by broker failure detector, and fixed if its self healing configuration is enabled.", clusterModel.deadBrokers());
            setBalancednessWithOfflineReplicas();
            return true;
        }
        if (clusterModel.brokersWithBadDisks().isEmpty()) {
            return false;
        }
        LOG.info("Skipping goal violation detection due to brokers with bad disks {}, which are reported by disk failure detector, and fixed if its self healing configuration is enabled.", clusterModel.brokersWithBadDisks());
        setBalancednessWithOfflineReplicas();
        return true;
    }

    protected void setBalancednessWithOfflineReplicas() {
        this._balancednessScore = BALANCEDNESS_SCORE_WITH_OFFLINE_REPLICAS;
    }

    protected void refreshBalancednessScore(Map<Boolean, List<String>> map) {
        this._balancednessScore = 100.0d;
        Iterator<List<String>> it = map.values().iterator();
        while (it.hasNext()) {
            it.next().forEach(str -> {
                this._balancednessScore -= this._balancednessCostByGoal.get(str).doubleValue();
            });
        }
    }

    protected Set<String> excludedTopics(ClusterModel clusterModel) {
        return (Set) clusterModel.topics().stream().filter(str -> {
            return this._excludedTopics.matcher(str).matches();
        }).collect(Collectors.toSet());
    }

    protected boolean optimizeForGoal(ClusterModel clusterModel, Goal goal, GoalViolations goalViolations, Set<Integer> set, Set<Integer> set2) throws KafkaCruiseControlException {
        if (clusterModel.topics().isEmpty()) {
            LOG.info("Skipping goal violation detection because the cluster model does not have any topic.");
            return false;
        }
        Map<TopicPartition, List<ReplicaPlacementInfo>> replicaDistribution = clusterModel.getReplicaDistribution();
        Map<TopicPartition, ReplicaPlacementInfo> leaderDistribution = clusterModel.getLeaderDistribution();
        try {
            goal.optimize(clusterModel, new HashSet(), this._optimizationOptionsGenerator.optimizationOptionsForGoalViolationDetection(clusterModel, excludedTopics(clusterModel), set, set2));
            Set<ExecutionProposal> diff = AnalyzerUtils.getDiff(replicaDistribution, leaderDistribution, clusterModel);
            LOG.trace("{} generated {} proposals", goal.name(), Integer.valueOf(diff.size()));
            if (diff.isEmpty()) {
                return false;
            }
            goalViolations.addViolation(goal.name(), true);
            return true;
        } catch (OptimizationFailureException e) {
            goalViolations.addViolation(goal.name(), false);
            return true;
        }
    }
}
