/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.detector;

import com.linkedin.cruisecontrol.detector.Anomaly;
import com.linkedin.cruisecontrol.exception.NotEnoughValidWindowsException;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.AnalyzerUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptionsGenerator;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal;
import com.linkedin.kafka.cruisecontrol.async.progress.OperationProgress;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.detector.AbstractAnomalyDetector;
import com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectionStatus;
import com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorUtils;
import com.linkedin.kafka.cruisecontrol.detector.GoalViolations;
import com.linkedin.kafka.cruisecontrol.exception.KafkaCruiseControlException;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionProposal;
import com.linkedin.kafka.cruisecontrol.executor.ExecutorState;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.ReplicaPlacementInfo;
import com.linkedin.kafka.cruisecontrol.monitor.ModelGeneration;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GoalViolationDetector
extends AbstractAnomalyDetector
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(GoalViolationDetector.class);
    private final List<Goal> _detectionGoals;
    private ModelGeneration _lastCheckedModelGeneration;
    private final Pattern _excludedTopics;
    private final boolean _allowCapacityEstimation;
    private final boolean _excludeRecentlyDemotedBrokers;
    private final boolean _excludeRecentlyRemovedBrokers;
    private final Map<String, Double> _balancednessCostByGoal;
    private volatile double _balancednessScore;
    private final OptimizationOptionsGenerator _optimizationOptionsGenerator;
    protected static final double BALANCEDNESS_SCORE_WITH_OFFLINE_REPLICAS = -1.0;

    public GoalViolationDetector(Queue<Anomaly> anomalies, KafkaCruiseControl kafkaCruiseControl) {
        super(anomalies, kafkaCruiseControl);
        KafkaCruiseControlConfig config = this._kafkaCruiseControl.config();
        this._detectionGoals = config.getConfiguredInstances("anomaly.detection.goals", Goal.class);
        this._excludedTopics = Pattern.compile(config.getString("topics.excluded.from.partition.movement"));
        this._allowCapacityEstimation = config.getBoolean("anomaly.detection.allow.capacity.estimation");
        this._excludeRecentlyDemotedBrokers = config.getBoolean("self.healing.exclude.recently.demoted.brokers");
        this._excludeRecentlyRemovedBrokers = config.getBoolean("self.healing.exclude.recently.removed.brokers");
        this._balancednessCostByGoal = KafkaCruiseControlUtils.balancednessCostByGoal(this._detectionGoals, config.getDouble("goal.balancedness.priority.weight"), config.getDouble("goal.balancedness.strictness.weight"));
        this._balancednessScore = 100.0;
        HashMap<String, Object> overrideConfigs = new HashMap<String, Object>(2);
        overrideConfigs.put("kafka.cruise.control.config.object", (Object)config);
        overrideConfigs.put("admin.client.object", this._kafkaCruiseControl.adminClient());
        this._optimizationOptionsGenerator = config.getConfiguredInstance("optimization.options.generator.class", OptimizationOptionsGenerator.class, overrideConfigs);
    }

    public double balancednessScore() {
        return this._balancednessScore;
    }

    protected AnomalyDetectionStatus getGoalViolationDetectionStatus() {
        if (this._kafkaCruiseControl.loadMonitor().clusterModelGeneration().equals(this._lastCheckedModelGeneration)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Skipping goal violation detection because the model generation hasn't changed. Current model generation {}", (Object)this._kafkaCruiseControl.loadMonitor().clusterModelGeneration());
            }
            return AnomalyDetectionStatus.SKIP_MODEL_GENERATION_NOT_CHANGED;
        }
        AnomalyDetectionStatus detectionStatus = AnomalyDetectorUtils.getAnomalyDetectionStatus(this._kafkaCruiseControl, true);
        if (detectionStatus == AnomalyDetectionStatus.SKIP_HAS_OFFLINE_REPLICAS) {
            this.setBalancednessWithOfflineReplicas();
        }
        return detectionStatus;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        if (this.getGoalViolationDetectionStatus() != AnomalyDetectionStatus.READY) {
            return;
        }
        AutoCloseable clusterModelSemaphore = null;
        try {
            HashMap<String, Object> parameterConfigOverrides = new HashMap<String, Object>(2);
            parameterConfigOverrides.put("kafka.cruise.control.object", this._kafkaCruiseControl);
            parameterConfigOverrides.put("anomaly.detection.time.ms.object", this._kafkaCruiseControl.timeMs());
            GoalViolations goalViolations = this._kafkaCruiseControl.config().getConfiguredInstance("goal.violations.class", GoalViolations.class, parameterConfigOverrides);
            boolean newModelNeeded = true;
            ClusterModel clusterModel = null;
            ExecutorState executorState = null;
            if (this._excludeRecentlyDemotedBrokers || this._excludeRecentlyRemovedBrokers) {
                executorState = this._kafkaCruiseControl.executorState();
            }
            Set<Integer> excludedBrokersForLeadership = this._excludeRecentlyDemotedBrokers ? executorState.recentlyDemotedBrokers() : Collections.emptySet();
            Set<Integer> excludedBrokersForReplicaMove = this._excludeRecentlyRemovedBrokers ? executorState.recentlyRemovedBrokers() : Collections.emptySet();
            for (Goal goal : this._detectionGoals) {
                if (this._kafkaCruiseControl.loadMonitor().meetCompletenessRequirements(goal.clusterModelCompletenessRequirements())) {
                    LOG.debug("Detecting if {} is violated.", (Object)goal.name());
                    if (newModelNeeded) {
                        if (clusterModelSemaphore != null) {
                            clusterModelSemaphore.close();
                        }
                        clusterModelSemaphore = this._kafkaCruiseControl.acquireForModelGeneration(new OperationProgress());
                        clusterModel = null;
                        clusterModel = this._kafkaCruiseControl.clusterModel(goal.clusterModelCompletenessRequirements(), this._allowCapacityEstimation, new OperationProgress());
                        if (this.skipDueToOfflineReplicas(clusterModel)) {
                            return;
                        }
                        this._lastCheckedModelGeneration = clusterModel.generation();
                    }
                    newModelNeeded = this.optimizeForGoal(clusterModel, goal, goalViolations, excludedBrokersForLeadership, excludedBrokersForReplicaMove);
                    continue;
                }
                LOG.warn("Skipping goal violation detection for {} because load completeness requirement is not met.", (Object)goal);
            }
            Map<Boolean, List<String>> violatedGoalsByFixability = goalViolations.violatedGoalsByFixability();
            if (!violatedGoalsByFixability.isEmpty()) {
                this._anomalies.add(goalViolations);
            }
            this.refreshBalancednessScore(violatedGoalsByFixability);
        }
        catch (NotEnoughValidWindowsException nevwe) {
            LOG.debug("Skipping goal violation detection because there are not enough valid windows.", (Throwable)nevwe);
        }
        catch (KafkaCruiseControlException kcce) {
            LOG.warn("Goal violation detector received exception", (Throwable)kcce);
        }
        catch (Exception e) {
            LOG.error("Unexpected exception", (Throwable)e);
        }
        finally {
            if (clusterModelSemaphore != null) {
                try {
                    clusterModelSemaphore.close();
                }
                catch (Exception e) {
                    LOG.error("Received exception when closing auto closable semaphore", (Throwable)e);
                }
            }
            LOG.debug("Goal violation detection finished.");
        }
    }

    protected boolean skipDueToOfflineReplicas(ClusterModel clusterModel) {
        if (!clusterModel.deadBrokers().isEmpty()) {
            LOG.info("Skipping goal violation detection due to dead brokers {}, which are reported by broker failure detector, and fixed if its self healing configuration is enabled.", clusterModel.deadBrokers());
            this.setBalancednessWithOfflineReplicas();
            return true;
        }
        if (!clusterModel.brokersWithBadDisks().isEmpty()) {
            LOG.info("Skipping goal violation detection due to brokers with bad disks {}, which are reported by disk failure detector, and fixed if its self healing configuration is enabled.", clusterModel.brokersWithBadDisks());
            this.setBalancednessWithOfflineReplicas();
            return true;
        }
        return false;
    }

    protected void setBalancednessWithOfflineReplicas() {
        this._balancednessScore = -1.0;
    }

    protected void refreshBalancednessScore(Map<Boolean, List<String>> violatedGoalsByFixability) {
        this._balancednessScore = 100.0;
        for (List<String> violatedGoals : violatedGoalsByFixability.values()) {
            violatedGoals.forEach(violatedGoal -> this._balancednessScore -= this._balancednessCostByGoal.get(violatedGoal).doubleValue());
        }
    }

    protected Set<String> excludedTopics(ClusterModel clusterModel) {
        return clusterModel.topics().stream().filter(topic -> this._excludedTopics.matcher((CharSequence)topic).matches()).collect(Collectors.toSet());
    }

    protected boolean optimizeForGoal(ClusterModel clusterModel, Goal goal, GoalViolations goalViolations, Set<Integer> excludedBrokersForLeadership, Set<Integer> excludedBrokersForReplicaMove) throws KafkaCruiseControlException {
        if (clusterModel.topics().isEmpty()) {
            LOG.info("Skipping goal violation detection because the cluster model does not have any topic.");
            return false;
        }
        Map<TopicPartition, List<ReplicaPlacementInfo>> initReplicaDistribution = clusterModel.getReplicaDistribution();
        Map<TopicPartition, ReplicaPlacementInfo> initLeaderDistribution = clusterModel.getLeaderDistribution();
        try {
            goal.optimize(clusterModel, new HashSet<Goal>(), this._optimizationOptionsGenerator.optimizationOptionsForGoalViolationDetection(clusterModel, this.excludedTopics(clusterModel), excludedBrokersForLeadership, excludedBrokersForReplicaMove));
        }
        catch (OptimizationFailureException ofe) {
            goalViolations.addViolation(goal.name(), false);
            return true;
        }
        Set<ExecutionProposal> proposals = AnalyzerUtils.getDiff(initReplicaDistribution, initLeaderDistribution, clusterModel);
        LOG.trace("{} generated {} proposals", (Object)goal.name(), (Object)proposals.size());
        if (!proposals.isEmpty()) {
            goalViolations.addViolation(goal.name(), true);
            return true;
        }
        return false;
    }
}

