package com.linkedin.kafka.cruisecontrol.detector;

import com.linkedin.cruisecontrol.detector.metricanomaly.MetricAnomaly;
import com.linkedin.cruisecontrol.detector.metricanomaly.MetricAnomalyFinder;
import com.linkedin.cruisecontrol.detector.metricanomaly.PercentileMetricAnomalyFinderUtils;
import com.linkedin.cruisecontrol.monitor.sampling.aggregator.AggregatedMetricValues;
import com.linkedin.cruisecontrol.monitor.sampling.aggregator.ValuesAndExtrapolations;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.config.constants.AnomalyDetectorConfig;
import com.linkedin.kafka.cruisecontrol.monitor.metricdefinition.KafkaMetricDef;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.BrokerEntity;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import org.apache.commons.math3.stat.descriptive.rank.Percentile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/SlowBrokerFinder.class */
public class SlowBrokerFinder implements MetricAnomalyFinder<BrokerEntity> {
    public static final String SELF_HEALING_SLOW_BROKER_REMOVAL_ENABLED_CONFIG = "self.healing.slow.broker.removal.enabled";
    public static final String REMOVE_SLOW_BROKER_CONFIG = "remove.slow.broker";
    public static final String SLOW_BROKER_BYTES_IN_RATE_DETECTION_THRESHOLD_CONFIG = "slow.broker.bytes.in.rate.detection.threshold";
    public static final double DEFAULT_SLOW_BROKER_BYTES_IN_RATE_DETECTION_THRESHOLD = 1024.0d;
    public static final String SLOW_BROKER_LOG_FLUSH_TIME_THRESHOLD_MS_CONFIG = "slow.broker.log.flush.time.threshold.ms";
    public static final double DEFAULT_SLOW_BROKER_LOG_FLUSH_TIME_THRESHOLD_MS_CONFIG = 150.0d;
    public static final String SLOW_BROKER_METRIC_HISTORY_PERCENTILE_THRESHOLD_CONFIG = "slow.broker.metric.history.percentile.threshold";
    public static final double DEFAULT_SLOW_BROKER_METRIC_HISTORY_PERCENTILE_THRESHOLD = 90.0d;
    public static final String SLOW_BROKER_METRIC_HISTORY_MARGIN_CONFIG = "slow.broker.metric.history.margin";
    public static final double DEFAULT_SLOW_BROKER_METRIC_HISTORY_MARGIN = 3.0d;
    public static final String SLOW_BROKER_PEER_METRIC_PERCENTILE_THRESHOLD_CONFIG = "slow.broker.peer.metric.percentile.threshold";
    public static final double DEFAULT_SLOW_BROKER_PEER_METRIC_PERCENTILE_THRESHOLD = 50.0d;
    public static final String SLOW_BROKER_PEER_METRIC_MARGIN_CONFIG = "slow.broker.peer.metric.margin";
    public static final double DEFAULT_SLOW_BROKER_PEER_METRIC_MARGIN = 3.0d;
    public static final String SLOW_BROKER_DEMOTION_SCORE_CONFIG = "slow.broker.demotion.score";
    public static final int DEFAULT_SLOW_BROKER_DEMOTION_SCORE = 5;
    public static final String SLOW_BROKER_DECOMMISSION_SCORE_CONFIG = "slow.broker.decommission.score";
    public static final int DEFAULT_SLOW_BROKER_DECOMMISSION_SCORE = 50;
    public static final String SLOW_BROKER_SELF_HEALING_UNFIXABLE_RATIO_CONFIG = "slow.broker.self.healing.unfixable.ratio";
    private static final double DEFAULT_SLOW_BROKER_SELF_HEALING_UNFIXABLE_RATIO = 0.1d;
    private KafkaCruiseControl _kafkaCruiseControl;
    private boolean _slowBrokerRemovalEnabled;
    private final Map<BrokerEntity, Integer> _brokerSlownessScore = new HashMap();
    private final Map<BrokerEntity, Long> _detectedSlowBrokers = new HashMap();
    private final Percentile _percentile = new Percentile();
    private double _bytesInRateDetectionThreshold;
    private double _logFlushTimeThresholdMs;
    private double _metricHistoryPercentile;
    private double _metricHistoryMargin;
    private double _peerMetricPercentile;
    private double _peerMetricMargin;
    private int _slowBrokerDemotionScore;
    private int _slowBrokerDecommissionScore;
    private double _selfHealingUnfixableRatio;
    private static final Logger LOG = LoggerFactory.getLogger(SlowBrokerFinder.class);
    private static final short BROKER_LOG_FLUSH_TIME_MS_999TH_ID = KafkaMetricDef.brokerMetricDef().metricInfo(KafkaMetricDef.BROKER_LOG_FLUSH_TIME_MS_999TH.name()).id();
    private static final short LEADER_BYTES_IN_ID = KafkaMetricDef.brokerMetricDef().metricInfo(KafkaMetricDef.LEADER_BYTES_IN.name()).id();
    private static final short REPLICATION_BYTES_IN_RATE_ID = KafkaMetricDef.brokerMetricDef().metricInfo(KafkaMetricDef.REPLICATION_BYTES_IN_RATE.name()).id();

    private Set<BrokerEntity> detectMetricAnomalies(Map<BrokerEntity, ValuesAndExtrapolations> map, Map<BrokerEntity, ValuesAndExtrapolations> map2) {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        HashMap hashMap4 = new HashMap();
        HashSet hashSet = new HashSet();
        for (BrokerEntity brokerEntity : map2.keySet()) {
            if (brokerHasNegligibleTraffic(brokerEntity, map2)) {
                hashSet.add(Integer.valueOf(brokerEntity.brokerId()));
            } else {
                collectLogFlushTimeMetric(brokerEntity, map, map2, hashMap, hashMap2);
                collectPerByteLogFlushTimeMetric(brokerEntity, map, map2, hashMap3, hashMap4);
            }
        }
        if (!hashSet.isEmpty()) {
            LOG.info("Skip broker slowness checking for brokers {} because they serve negligible traffic.", hashSet);
        }
        Set<BrokerEntity> metricAnomalies = getMetricAnomalies(hashMap, hashMap2);
        metricAnomalies.retainAll(getMetricAnomalies(hashMap3, hashMap4));
        metricAnomalies.retainAll(getLogFlushTimeMetricAnomaliesFromValue(hashMap2));
        return metricAnomalies;
    }

    private boolean brokerHasNegligibleTraffic(BrokerEntity brokerEntity, Map<BrokerEntity, ValuesAndExtrapolations> map) {
        AggregatedMetricValues metricValues = map.get(brokerEntity).metricValues();
        double latest = metricValues.valuesFor(LEADER_BYTES_IN_ID).latest() + metricValues.valuesFor(REPLICATION_BYTES_IN_RATE_ID).latest();
        LOG.debug("Broker {}'s total bytes in rate is {} KB/s.", Integer.valueOf(brokerEntity.brokerId()), Double.valueOf(latest));
        return latest < this._bytesInRateDetectionThreshold;
    }

    private void collectLogFlushTimeMetric(BrokerEntity brokerEntity, Map<BrokerEntity, ValuesAndExtrapolations> map, Map<BrokerEntity, ValuesAndExtrapolations> map2, Map<BrokerEntity, List<Double>> map3, Map<BrokerEntity, Double> map4) {
        map4.put(brokerEntity, Double.valueOf(map2.get(brokerEntity).metricValues().valuesFor(BROKER_LOG_FLUSH_TIME_MS_999TH_ID).latest()));
        if (map.get(brokerEntity) == null) {
            LOG.debug("Metric history for broker {} is missing. This may be due to a newly joined broker or Cruise Control cold start.", Integer.valueOf(brokerEntity.brokerId()));
            return;
        }
        double[] doubleArray = map.get(brokerEntity).metricValues().valuesFor(BROKER_LOG_FLUSH_TIME_MS_999TH_ID).doubleArray();
        ArrayList arrayList = new ArrayList(doubleArray.length);
        for (int i = 0; i < doubleArray.length; i++) {
            if (doubleArray[i] > 5.0d) {
                arrayList.add(Double.valueOf(doubleArray[i]));
            }
        }
        map3.put(brokerEntity, arrayList);
    }

    private void collectPerByteLogFlushTimeMetric(BrokerEntity brokerEntity, Map<BrokerEntity, ValuesAndExtrapolations> map, Map<BrokerEntity, ValuesAndExtrapolations> map2, Map<BrokerEntity, List<Double>> map3, Map<BrokerEntity, Double> map4) {
        AggregatedMetricValues metricValues = map2.get(brokerEntity).metricValues();
        map4.put(brokerEntity, Double.valueOf(metricValues.valuesFor(BROKER_LOG_FLUSH_TIME_MS_999TH_ID).latest() / (metricValues.valuesFor(LEADER_BYTES_IN_ID).latest() + metricValues.valuesFor(REPLICATION_BYTES_IN_RATE_ID).latest())));
        if (map.get(brokerEntity) == null) {
            LOG.debug("Metric history for broker {} is missing. This may be due to a newly joined broker or Cruise Control cold start.", Integer.valueOf(brokerEntity.brokerId()));
            return;
        }
        AggregatedMetricValues metricValues2 = map.get(brokerEntity).metricValues();
        double[] doubleArray = metricValues2.valuesFor(LEADER_BYTES_IN_ID).doubleArray();
        double[] doubleArray2 = metricValues2.valuesFor(REPLICATION_BYTES_IN_RATE_ID).doubleArray();
        double[] doubleArray3 = metricValues2.valuesFor(BROKER_LOG_FLUSH_TIME_MS_999TH_ID).doubleArray();
        ArrayList arrayList = new ArrayList(doubleArray.length);
        for (int i = 0; i < doubleArray.length; i++) {
            double d = doubleArray[i] + doubleArray2[i];
            if (d >= this._bytesInRateDetectionThreshold) {
                arrayList.add(Double.valueOf(doubleArray3[i] / d));
            }
        }
        map3.put(brokerEntity, arrayList);
    }

    private Set<BrokerEntity> getMetricAnomalies(Map<BrokerEntity, List<Double>> map, Map<BrokerEntity, Double> map2) {
        HashSet hashSet = new HashSet();
        detectMetricAnomaliesFromHistory(map, map2, hashSet);
        detectMetricAnomaliesFromPeers(map2, hashSet);
        return hashSet;
    }

    private void detectMetricAnomaliesFromHistory(Map<BrokerEntity, List<Double>> map, Map<BrokerEntity, Double> map2, Set<BrokerEntity> set) {
        Iterator<Map.Entry<BrokerEntity, Double>> it = map2.entrySet().iterator();
        while (it.hasNext()) {
            BrokerEntity key = it.next().getKey();
            if (map.get(key) != null && PercentileMetricAnomalyFinderUtils.isDataSufficient(map.get(key).size(), this._metricHistoryPercentile, this._metricHistoryPercentile)) {
                this._percentile.setData(map.get(key).stream().mapToDouble(d -> {
                    return d.doubleValue();
                }).toArray());
                if (map2.get(key).doubleValue() > this._percentile.evaluate(this._metricHistoryPercentile) * this._metricHistoryMargin) {
                    set.add(key);
                }
            }
        }
    }

    private void detectMetricAnomaliesFromPeers(Map<BrokerEntity, Double> map, Set<BrokerEntity> set) {
        if (PercentileMetricAnomalyFinderUtils.isDataSufficient(map.size(), this._peerMetricPercentile, this._peerMetricPercentile)) {
            this._percentile.setData(map.values().stream().mapToDouble(d -> {
                return d.doubleValue();
            }).toArray());
            double evaluate = this._percentile.evaluate(this._peerMetricPercentile);
            for (Map.Entry<BrokerEntity, Double> entry : map.entrySet()) {
                if (map.get(entry.getKey()).doubleValue() > evaluate * this._peerMetricMargin) {
                    set.add(entry.getKey());
                }
            }
        }
    }

    private Set<BrokerEntity> getLogFlushTimeMetricAnomaliesFromValue(Map<BrokerEntity, Double> map) {
        HashSet hashSet = new HashSet();
        for (Map.Entry<BrokerEntity, Double> entry : map.entrySet()) {
            if (map.get(entry.getKey()).doubleValue() > this._logFlushTimeThresholdMs) {
                hashSet.add(entry.getKey());
            }
        }
        return hashSet;
    }

    private SlowBrokers createSlowBrokersAnomaly(Map<BrokerEntity, Long> map, boolean z, boolean z2, String str) {
        HashMap hashMap = new HashMap(5);
        hashMap.put(AnomalyDetectorUtils.KAFKA_CRUISE_CONTROL_OBJECT_CONFIG, this._kafkaCruiseControl);
        hashMap.put(MetricAnomalyDetector.METRIC_ANOMALY_DESCRIPTION_OBJECT_CONFIG, str);
        hashMap.put(MetricAnomalyDetector.METRIC_ANOMALY_BROKER_ENTITIES_OBJECT_CONFIG, map);
        hashMap.put(REMOVE_SLOW_BROKER_CONFIG, Boolean.valueOf(z2));
        hashMap.put(AnomalyDetectorUtils.ANOMALY_DETECTION_TIME_MS_OBJECT_CONFIG, Long.valueOf(this._kafkaCruiseControl.timeMs()));
        hashMap.put(MetricAnomalyDetector.METRIC_ANOMALY_FIXABLE_OBJECT_CONFIG, Boolean.valueOf(z));
        return (SlowBrokers) this._kafkaCruiseControl.config().getConfiguredInstance(AnomalyDetectorConfig.METRIC_ANOMALY_CLASS_CONFIG, SlowBrokers.class, hashMap);
    }

    private String getSlowBrokerDescription(Map<BrokerEntity, Long> map) {
        StringBuilder append = new StringBuilder().append("{");
        map.forEach((brokerEntity, l) -> {
            append.append("Broker ").append(brokerEntity.brokerId()).append("'s performance degraded at ").append(KafkaCruiseControlUtils.toDateString(l.longValue())).append(", ");
        });
        append.setLength(append.length() - 2);
        append.append("}");
        return append.toString();
    }

    public Collection<MetricAnomaly<BrokerEntity>> metricAnomalies(Map<BrokerEntity, ValuesAndExtrapolations> map, Map<BrokerEntity, ValuesAndExtrapolations> map2) {
        LOG.info("Slow broker detection started.");
        try {
            try {
                Set<BrokerEntity> detectMetricAnomalies = detectMetricAnomalies(map, map2);
                updateBrokerSlownessScore(detectMetricAnomalies);
                Set<MetricAnomaly<BrokerEntity>> createSlowBrokerAnomalies = createSlowBrokerAnomalies(detectMetricAnomalies, map.size());
                LOG.info("Slow broker detection finished.");
                return createSlowBrokerAnomalies;
            } catch (Exception e) {
                LOG.warn("Slow broker detector encountered exception: ", e);
                LOG.info("Slow broker detection finished.");
                return Collections.emptySet();
            }
        } catch (Throwable th) {
            LOG.info("Slow broker detection finished.");
            throw th;
        }
    }

    private void updateBrokerSlownessScore(Set<BrokerEntity> set) {
        for (BrokerEntity brokerEntity : set) {
            this._detectedSlowBrokers.putIfAbsent(brokerEntity, Long.valueOf(this._kafkaCruiseControl.timeMs()));
            this._brokerSlownessScore.compute(brokerEntity, (brokerEntity2, num) -> {
                return Integer.valueOf(num == null ? 1 : Math.min(num.intValue() + 1, this._slowBrokerDecommissionScore));
            });
        }
        HashSet<BrokerEntity> hashSet = new HashSet();
        for (Map.Entry<BrokerEntity, Integer> entry : this._brokerSlownessScore.entrySet()) {
            BrokerEntity key = entry.getKey();
            if (!set.contains(key)) {
                Integer value = entry.getValue();
                if (value != null) {
                    Integer valueOf = Integer.valueOf(value.intValue() - 1);
                    value = valueOf;
                    if (valueOf.intValue() == 0) {
                        hashSet.add(key);
                    }
                }
                entry.setValue(value);
            }
        }
        for (BrokerEntity brokerEntity3 : hashSet) {
            this._brokerSlownessScore.remove(brokerEntity3);
            this._detectedSlowBrokers.remove(brokerEntity3);
        }
    }

    private Set<MetricAnomaly<BrokerEntity>> createSlowBrokerAnomalies(Set<BrokerEntity> set, int i) {
        HashSet hashSet = new HashSet();
        Map<BrokerEntity, Long> hashMap = new HashMap<>();
        Map<BrokerEntity, Long> hashMap2 = new HashMap<>();
        for (BrokerEntity brokerEntity : set) {
            int intValue = this._brokerSlownessScore.get(brokerEntity).intValue();
            if (intValue == this._slowBrokerDecommissionScore) {
                hashMap2.put(brokerEntity, this._detectedSlowBrokers.get(brokerEntity));
            } else if (intValue >= this._slowBrokerDemotionScore) {
                hashMap.put(brokerEntity, this._detectedSlowBrokers.get(brokerEntity));
            }
        }
        if (hashMap.size() + hashMap2.size() > i * this._selfHealingUnfixableRatio) {
            hashMap.getClass();
            hashMap2.forEach((v1, v2) -> {
                r1.put(v1, v2);
            });
            hashSet.add(createSlowBrokersAnomaly(hashMap, false, false, getSlowBrokerDescription(hashMap)));
        } else {
            if (!hashMap.isEmpty()) {
                hashSet.add(createSlowBrokersAnomaly(hashMap, true, false, getSlowBrokerDescription(hashMap)));
            }
            if (!hashMap2.isEmpty()) {
                hashSet.add(createSlowBrokersAnomaly(hashMap2, this._slowBrokerRemovalEnabled, true, getSlowBrokerDescription(hashMap2)));
            }
        }
        return hashSet;
    }

    public void configure(Map<String, ?> map) {
        this._kafkaCruiseControl = (KafkaCruiseControl) map.get(AnomalyDetectorUtils.KAFKA_CRUISE_CONTROL_OBJECT_CONFIG);
        if (this._kafkaCruiseControl == null) {
            throw new IllegalArgumentException("Slow broker detector is missing kafka.cruise.control.object");
        }
        Map originals = this._kafkaCruiseControl.config().originals();
        this._slowBrokerRemovalEnabled = Boolean.parseBoolean((String) originals.get(SELF_HEALING_SLOW_BROKER_REMOVAL_ENABLED_CONFIG));
        this._bytesInRateDetectionThreshold = AnomalyUtils.parseAndGetConfig((Map<String, Object>) originals, SLOW_BROKER_BYTES_IN_RATE_DETECTION_THRESHOLD_CONFIG, 1024.0d, (Predicate<Double>) d -> {
            return d.doubleValue() < 0.0d;
        });
        this._logFlushTimeThresholdMs = AnomalyUtils.parseAndGetConfig((Map<String, Object>) originals, SLOW_BROKER_LOG_FLUSH_TIME_THRESHOLD_MS_CONFIG, 150.0d, (Predicate<Double>) d2 -> {
            return d2.doubleValue() < 0.0d;
        });
        this._metricHistoryPercentile = AnomalyUtils.parseAndGetConfig((Map<String, Object>) originals, SLOW_BROKER_METRIC_HISTORY_PERCENTILE_THRESHOLD_CONFIG, 90.0d, (Predicate<Double>) d3 -> {
            return d3.doubleValue() < 0.0d || d3.doubleValue() > 100.0d;
        });
        this._metricHistoryMargin = AnomalyUtils.parseAndGetConfig((Map<String, Object>) originals, SLOW_BROKER_METRIC_HISTORY_MARGIN_CONFIG, 3.0d, (Predicate<Double>) d4 -> {
            return d4.doubleValue() < 1.0d;
        });
        this._peerMetricPercentile = AnomalyUtils.parseAndGetConfig((Map<String, Object>) originals, SLOW_BROKER_PEER_METRIC_PERCENTILE_THRESHOLD_CONFIG, 50.0d, (Predicate<Double>) d5 -> {
            return d5.doubleValue() < 0.0d || d5.doubleValue() > 100.0d;
        });
        this._peerMetricMargin = AnomalyUtils.parseAndGetConfig((Map<String, Object>) originals, SLOW_BROKER_PEER_METRIC_MARGIN_CONFIG, 3.0d, (Predicate<Double>) d6 -> {
            return d6.doubleValue() < 1.0d;
        });
        this._slowBrokerDemotionScore = AnomalyUtils.parseAndGetConfig((Map<String, Object>) originals, SLOW_BROKER_DEMOTION_SCORE_CONFIG, 5, (Predicate<Integer>) num -> {
            return num.intValue() < 0;
        });
        this._slowBrokerDecommissionScore = AnomalyUtils.parseAndGetConfig((Map<String, Object>) originals, SLOW_BROKER_DECOMMISSION_SCORE_CONFIG, 50, (Predicate<Integer>) num2 -> {
            return num2.intValue() < 0;
        });
        this._selfHealingUnfixableRatio = AnomalyUtils.parseAndGetConfig((Map<String, Object>) originals, SLOW_BROKER_SELF_HEALING_UNFIXABLE_RATIO_CONFIG, 0.1d, (Predicate<Double>) d7 -> {
            return d7.doubleValue() < 0.0d || d7.doubleValue() > 1.0d;
        });
    }
}
