/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.detector;

import com.linkedin.cruisecontrol.detector.metricanomaly.MetricAnomaly;
import com.linkedin.cruisecontrol.detector.metricanomaly.MetricAnomalyFinder;
import com.linkedin.cruisecontrol.detector.metricanomaly.PercentileMetricAnomalyFinderUtils;
import com.linkedin.cruisecontrol.monitor.sampling.aggregator.AggregatedMetricValues;
import com.linkedin.cruisecontrol.monitor.sampling.aggregator.ValuesAndExtrapolations;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.detector.AnomalyUtils;
import com.linkedin.kafka.cruisecontrol.detector.SlowBrokers;
import com.linkedin.kafka.cruisecontrol.monitor.metricdefinition.KafkaMetricDef;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.BrokerEntity;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.math3.stat.descriptive.rank.Percentile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SlowBrokerFinder
implements MetricAnomalyFinder<BrokerEntity> {
    private static final Logger LOG = LoggerFactory.getLogger(SlowBrokerFinder.class);
    public static final String SELF_HEALING_SLOW_BROKER_REMOVAL_ENABLED_CONFIG = "self.healing.slow.broker.removal.enabled";
    public static final String REMOVE_SLOW_BROKER_CONFIG = "remove.slow.broker";
    public static final String SLOW_BROKER_BYTES_IN_RATE_DETECTION_THRESHOLD_CONFIG = "slow.broker.bytes.in.rate.detection.threshold";
    public static final double DEFAULT_SLOW_BROKER_BYTES_IN_RATE_DETECTION_THRESHOLD = 1024.0;
    public static final String SLOW_BROKER_LOG_FLUSH_TIME_THRESHOLD_MS_CONFIG = "slow.broker.log.flush.time.threshold.ms";
    public static final double DEFAULT_SLOW_BROKER_LOG_FLUSH_TIME_THRESHOLD_MS_CONFIG = 150.0;
    public static final String SLOW_BROKER_METRIC_HISTORY_PERCENTILE_THRESHOLD_CONFIG = "slow.broker.metric.history.percentile.threshold";
    public static final double DEFAULT_SLOW_BROKER_METRIC_HISTORY_PERCENTILE_THRESHOLD = 90.0;
    public static final String SLOW_BROKER_METRIC_HISTORY_MARGIN_CONFIG = "slow.broker.metric.history.margin";
    public static final double DEFAULT_SLOW_BROKER_METRIC_HISTORY_MARGIN = 3.0;
    public static final String SLOW_BROKER_PEER_METRIC_PERCENTILE_THRESHOLD_CONFIG = "slow.broker.peer.metric.percentile.threshold";
    public static final double DEFAULT_SLOW_BROKER_PEER_METRIC_PERCENTILE_THRESHOLD = 50.0;
    public static final String SLOW_BROKER_PEER_METRIC_MARGIN_CONFIG = "slow.broker.peer.metric.margin";
    public static final double DEFAULT_SLOW_BROKER_PEER_METRIC_MARGIN = 3.0;
    public static final String SLOW_BROKER_DEMOTION_SCORE_CONFIG = "slow.broker.demotion.score";
    public static final int DEFAULT_SLOW_BROKER_DEMOTION_SCORE = 5;
    public static final String SLOW_BROKER_DECOMMISSION_SCORE_CONFIG = "slow.broker.decommission.score";
    public static final int DEFAULT_SLOW_BROKER_DECOMMISSION_SCORE = 50;
    public static final String SLOW_BROKER_SELF_HEALING_UNFIXABLE_RATIO_CONFIG = "slow.broker.self.healing.unfixable.ratio";
    private static final double DEFAULT_SLOW_BROKER_SELF_HEALING_UNFIXABLE_RATIO = 0.1;
    private static final short BROKER_LOG_FLUSH_TIME_MS_999TH_ID = KafkaMetricDef.brokerMetricDef().metricInfo(KafkaMetricDef.BROKER_LOG_FLUSH_TIME_MS_999TH.name()).id();
    private static final short LEADER_BYTES_IN_ID = KafkaMetricDef.brokerMetricDef().metricInfo(KafkaMetricDef.LEADER_BYTES_IN.name()).id();
    private static final short REPLICATION_BYTES_IN_RATE_ID = KafkaMetricDef.brokerMetricDef().metricInfo(KafkaMetricDef.REPLICATION_BYTES_IN_RATE.name()).id();
    private KafkaCruiseControl _kafkaCruiseControl;
    private boolean _slowBrokerRemovalEnabled;
    private final Map<BrokerEntity, Integer> _brokerSlownessScore = new HashMap<BrokerEntity, Integer>();
    private final Map<BrokerEntity, Long> _detectedSlowBrokers = new HashMap<BrokerEntity, Long>();
    private final Percentile _percentile = new Percentile();
    private double _bytesInRateDetectionThreshold;
    private double _logFlushTimeThresholdMs;
    private double _metricHistoryPercentile;
    private double _metricHistoryMargin;
    private double _peerMetricPercentile;
    private double _peerMetricMargin;
    private int _slowBrokerDemotionScore;
    private int _slowBrokerDecommissionScore;
    private double _selfHealingUnfixableRatio;

    private Set<BrokerEntity> detectMetricAnomalies(Map<BrokerEntity, ValuesAndExtrapolations> metricsHistoryByBroker, Map<BrokerEntity, ValuesAndExtrapolations> currentMetricsByBroker) {
        HashMap<BrokerEntity, List<Double>> historicalLogFlushTimeMetricValues = new HashMap<BrokerEntity, List<Double>>();
        HashMap<BrokerEntity, Double> currentLogFlushTimeMetricValues = new HashMap<BrokerEntity, Double>();
        HashMap<BrokerEntity, List<Double>> historicalPerByteLogFlushTimeMetricValues = new HashMap<BrokerEntity, List<Double>>();
        HashMap<BrokerEntity, Double> currentPerByteLogFlushTimeMetricValues = new HashMap<BrokerEntity, Double>();
        HashSet<Integer> skippedBrokers = new HashSet<Integer>();
        for (BrokerEntity broker : currentMetricsByBroker.keySet()) {
            if (!this.brokerHasNegligibleTraffic(broker, currentMetricsByBroker)) {
                this.collectLogFlushTimeMetric(broker, metricsHistoryByBroker, currentMetricsByBroker, historicalLogFlushTimeMetricValues, currentLogFlushTimeMetricValues);
                this.collectPerByteLogFlushTimeMetric(broker, metricsHistoryByBroker, currentMetricsByBroker, historicalPerByteLogFlushTimeMetricValues, currentPerByteLogFlushTimeMetricValues);
                continue;
            }
            skippedBrokers.add(broker.brokerId());
        }
        if (!skippedBrokers.isEmpty()) {
            LOG.info("Skip broker slowness checking for brokers {} because they serve negligible traffic.", skippedBrokers);
        }
        Set<BrokerEntity> detectedMetricAnomalies = this.getMetricAnomalies(historicalLogFlushTimeMetricValues, currentLogFlushTimeMetricValues);
        detectedMetricAnomalies.retainAll(this.getMetricAnomalies(historicalPerByteLogFlushTimeMetricValues, currentPerByteLogFlushTimeMetricValues));
        detectedMetricAnomalies.retainAll(this.getLogFlushTimeMetricAnomaliesFromValue(currentLogFlushTimeMetricValues));
        return detectedMetricAnomalies;
    }

    private boolean brokerHasNegligibleTraffic(BrokerEntity broker, Map<BrokerEntity, ValuesAndExtrapolations> currentMetricsByBroker) {
        AggregatedMetricValues aggregatedMetricValues = currentMetricsByBroker.get((Object)broker).metricValues();
        double latestTotalBytesIn = aggregatedMetricValues.valuesFor(LEADER_BYTES_IN_ID).latest() + aggregatedMetricValues.valuesFor(REPLICATION_BYTES_IN_RATE_ID).latest();
        LOG.debug("Broker {}'s total bytes in rate is {} KB/s.", (Object)broker.brokerId(), (Object)latestTotalBytesIn);
        return latestTotalBytesIn < this._bytesInRateDetectionThreshold;
    }

    private void collectLogFlushTimeMetric(BrokerEntity broker, Map<BrokerEntity, ValuesAndExtrapolations> metricsHistoryByBroker, Map<BrokerEntity, ValuesAndExtrapolations> currentMetricsByBroker, Map<BrokerEntity, List<Double>> historicalLogFlushTimeMetricValues, Map<BrokerEntity, Double> currentLogFlushTimeMetricValues) {
        AggregatedMetricValues aggregatedMetricValues = currentMetricsByBroker.get((Object)broker).metricValues();
        double latestLogFlushTime = aggregatedMetricValues.valuesFor(BROKER_LOG_FLUSH_TIME_MS_999TH_ID).latest();
        currentLogFlushTimeMetricValues.put(broker, latestLogFlushTime);
        if (metricsHistoryByBroker.get((Object)broker) != null) {
            aggregatedMetricValues = metricsHistoryByBroker.get((Object)broker).metricValues();
            double[] historicalLogFlushTime = aggregatedMetricValues.valuesFor(BROKER_LOG_FLUSH_TIME_MS_999TH_ID).doubleArray();
            ArrayList<Double> historicalValue = new ArrayList<Double>(historicalLogFlushTime.length);
            for (int i = 0; i < historicalLogFlushTime.length; ++i) {
                if (!(historicalLogFlushTime[i] > 5.0)) continue;
                historicalValue.add(historicalLogFlushTime[i]);
            }
            historicalLogFlushTimeMetricValues.put(broker, historicalValue);
        } else {
            LOG.debug("Metric history for broker {} is missing. This may be due to a newly joined broker or Cruise Control cold start.", (Object)broker.brokerId());
        }
    }

    private void collectPerByteLogFlushTimeMetric(BrokerEntity broker, Map<BrokerEntity, ValuesAndExtrapolations> metricsHistoryByBroker, Map<BrokerEntity, ValuesAndExtrapolations> currentMetricsByBroker, Map<BrokerEntity, List<Double>> historicalPerByteLogFlushTimeMetricValues, Map<BrokerEntity, Double> currentPerByteLogFlushTimeMetricValues) {
        AggregatedMetricValues aggregatedMetricValues = currentMetricsByBroker.get((Object)broker).metricValues();
        double latestLogFlushTime = aggregatedMetricValues.valuesFor(BROKER_LOG_FLUSH_TIME_MS_999TH_ID).latest();
        double latestTotalBytesIn = aggregatedMetricValues.valuesFor(LEADER_BYTES_IN_ID).latest() + aggregatedMetricValues.valuesFor(REPLICATION_BYTES_IN_RATE_ID).latest();
        currentPerByteLogFlushTimeMetricValues.put(broker, latestLogFlushTime / latestTotalBytesIn);
        if (metricsHistoryByBroker.get((Object)broker) != null) {
            aggregatedMetricValues = metricsHistoryByBroker.get((Object)broker).metricValues();
            double[] historicalBytesIn = aggregatedMetricValues.valuesFor(LEADER_BYTES_IN_ID).doubleArray();
            double[] historicalReplicationBytesIn = aggregatedMetricValues.valuesFor(REPLICATION_BYTES_IN_RATE_ID).doubleArray();
            double[] historicalLogFlushTime = aggregatedMetricValues.valuesFor(BROKER_LOG_FLUSH_TIME_MS_999TH_ID).doubleArray();
            ArrayList<Double> historicalValue = new ArrayList<Double>(historicalBytesIn.length);
            for (int i = 0; i < historicalBytesIn.length; ++i) {
                double totalBytesIn = historicalBytesIn[i] + historicalReplicationBytesIn[i];
                if (!(totalBytesIn >= this._bytesInRateDetectionThreshold)) continue;
                historicalValue.add(historicalLogFlushTime[i] / totalBytesIn);
            }
            historicalPerByteLogFlushTimeMetricValues.put(broker, historicalValue);
        } else {
            LOG.debug("Metric history for broker {} is missing. This may be due to a newly joined broker or Cruise Control cold start.", (Object)broker.brokerId());
        }
    }

    private Set<BrokerEntity> getMetricAnomalies(Map<BrokerEntity, List<Double>> historicalValueByBroker, Map<BrokerEntity, Double> currentValueByBroker) {
        HashSet<BrokerEntity> detectedMetricAnomalies = new HashSet<BrokerEntity>();
        this.detectMetricAnomaliesFromHistory(historicalValueByBroker, currentValueByBroker, detectedMetricAnomalies);
        this.detectMetricAnomaliesFromPeers(currentValueByBroker, detectedMetricAnomalies);
        return detectedMetricAnomalies;
    }

    private void detectMetricAnomaliesFromHistory(Map<BrokerEntity, List<Double>> historicalValue, Map<BrokerEntity, Double> currentValue, Set<BrokerEntity> detectedMetricAnomalies) {
        for (Map.Entry<BrokerEntity, Double> entry : currentValue.entrySet()) {
            BrokerEntity entity = entry.getKey();
            if (historicalValue.get((Object)entity) == null || !PercentileMetricAnomalyFinderUtils.isDataSufficient((int)historicalValue.get((Object)entity).size(), (double)this._metricHistoryPercentile, (double)this._metricHistoryPercentile)) continue;
            double[] data = historicalValue.get((Object)entity).stream().mapToDouble(i -> i).toArray();
            this._percentile.setData(data);
            if (!(currentValue.get((Object)entity) > this._percentile.evaluate(this._metricHistoryPercentile) * this._metricHistoryMargin)) continue;
            detectedMetricAnomalies.add(entity);
        }
    }

    private void detectMetricAnomaliesFromPeers(Map<BrokerEntity, Double> currentValue, Set<BrokerEntity> detectedMetricAnomalies) {
        if (PercentileMetricAnomalyFinderUtils.isDataSufficient((int)currentValue.size(), (double)this._peerMetricPercentile, (double)this._peerMetricPercentile)) {
            double[] data = currentValue.values().stream().mapToDouble(i -> i).toArray();
            this._percentile.setData(data);
            double base = this._percentile.evaluate(this._peerMetricPercentile);
            for (Map.Entry<BrokerEntity, Double> entry : currentValue.entrySet()) {
                if (!(currentValue.get((Object)entry.getKey()) > base * this._peerMetricMargin)) continue;
                detectedMetricAnomalies.add(entry.getKey());
            }
        }
    }

    private Set<BrokerEntity> getLogFlushTimeMetricAnomaliesFromValue(Map<BrokerEntity, Double> currentValue) {
        HashSet<BrokerEntity> detectedMetricAnomalies = new HashSet<BrokerEntity>();
        for (Map.Entry<BrokerEntity, Double> entry : currentValue.entrySet()) {
            if (!(currentValue.get((Object)entry.getKey()) > this._logFlushTimeThresholdMs)) continue;
            detectedMetricAnomalies.add(entry.getKey());
        }
        return detectedMetricAnomalies;
    }

    private SlowBrokers createSlowBrokersAnomaly(Map<BrokerEntity, Long> detectedBrokers, boolean fixable, boolean removeSlowBroker, String description) {
        HashMap<String, Object> parameterConfigOverrides = new HashMap<String, Object>(5);
        parameterConfigOverrides.put("kafka.cruise.control.object", this._kafkaCruiseControl);
        parameterConfigOverrides.put("metric.anomaly.description.object", description);
        parameterConfigOverrides.put("metric.anomaly.broker.entities.object", detectedBrokers);
        parameterConfigOverrides.put(REMOVE_SLOW_BROKER_CONFIG, removeSlowBroker);
        parameterConfigOverrides.put("anomaly.detection.time.ms.object", this._kafkaCruiseControl.timeMs());
        parameterConfigOverrides.put("metric.anomaly.fixable.object", fixable);
        return this._kafkaCruiseControl.config().getConfiguredInstance("metric.anomaly.class", SlowBrokers.class, parameterConfigOverrides);
    }

    private String getSlowBrokerDescription(Map<BrokerEntity, Long> detectedBrokers) {
        StringBuilder descriptionSb = new StringBuilder().append("{");
        detectedBrokers.forEach((key, value) -> descriptionSb.append("Broker ").append(key.brokerId()).append("'s performance degraded at ").append(KafkaCruiseControlUtils.toDateString(value)).append(", "));
        descriptionSb.setLength(descriptionSb.length() - 2);
        descriptionSb.append("}");
        return descriptionSb.toString();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Collection<MetricAnomaly<BrokerEntity>> metricAnomalies(Map<BrokerEntity, ValuesAndExtrapolations> metricsHistoryByBroker, Map<BrokerEntity, ValuesAndExtrapolations> currentMetricsByBroker) {
        LOG.info("Slow broker detection started.");
        try {
            Set<BrokerEntity> detectedMetricAnomalies = this.detectMetricAnomalies(metricsHistoryByBroker, currentMetricsByBroker);
            this.updateBrokerSlownessScore(detectedMetricAnomalies);
            Set<MetricAnomaly<BrokerEntity>> set = this.createSlowBrokerAnomalies(detectedMetricAnomalies, metricsHistoryByBroker.size());
            return set;
        }
        catch (Exception e) {
            LOG.warn("Slow broker detector encountered exception: ", (Throwable)e);
        }
        finally {
            LOG.info("Slow broker detection finished.");
        }
        return Collections.emptySet();
    }

    private void updateBrokerSlownessScore(Set<BrokerEntity> detectedMetricAnomalies) {
        for (BrokerEntity broker : detectedMetricAnomalies) {
            Long currentTimeMs = this._kafkaCruiseControl.timeMs();
            this._detectedSlowBrokers.putIfAbsent(broker, currentTimeMs);
            this._brokerSlownessScore.compute(broker, (k, v) -> v == null ? 1 : Math.min(v + 1, this._slowBrokerDecommissionScore));
        }
        HashSet<BrokerEntity> brokersRecovered = new HashSet<BrokerEntity>();
        for (Map.Entry<BrokerEntity, Integer> entry : this._brokerSlownessScore.entrySet()) {
            BrokerEntity broker = entry.getKey();
            if (detectedMetricAnomalies.contains((Object)broker)) continue;
            Integer score = entry.getValue();
            if (score != null && (score = Integer.valueOf(score - 1)) == 0) {
                brokersRecovered.add(broker);
                continue;
            }
            entry.setValue(score);
        }
        for (BrokerEntity broker : brokersRecovered) {
            this._brokerSlownessScore.remove((Object)broker);
            this._detectedSlowBrokers.remove((Object)broker);
        }
    }

    private Set<MetricAnomaly<BrokerEntity>> createSlowBrokerAnomalies(Set<BrokerEntity> detectedMetricAnomalies, int clusterSize) {
        HashSet<MetricAnomaly<BrokerEntity>> detectedSlowBrokers = new HashSet<MetricAnomaly<BrokerEntity>>();
        HashMap<BrokerEntity, Long> brokersToDemote = new HashMap<BrokerEntity, Long>();
        HashMap<BrokerEntity, Long> brokersToRemove = new HashMap<BrokerEntity, Long>();
        for (BrokerEntity broker : detectedMetricAnomalies) {
            int slownessScore = this._brokerSlownessScore.get((Object)broker);
            if (slownessScore == this._slowBrokerDecommissionScore) {
                brokersToRemove.put(broker, this._detectedSlowBrokers.get((Object)broker));
                continue;
            }
            if (slownessScore < this._slowBrokerDemotionScore) continue;
            brokersToDemote.put(broker, this._detectedSlowBrokers.get((Object)broker));
        }
        if ((double)(brokersToDemote.size() + brokersToRemove.size()) > (double)clusterSize * this._selfHealingUnfixableRatio) {
            brokersToRemove.forEach(brokersToDemote::put);
            detectedSlowBrokers.add(this.createSlowBrokersAnomaly(brokersToDemote, false, false, this.getSlowBrokerDescription(brokersToDemote)));
        } else {
            if (!brokersToDemote.isEmpty()) {
                detectedSlowBrokers.add(this.createSlowBrokersAnomaly(brokersToDemote, true, false, this.getSlowBrokerDescription(brokersToDemote)));
            }
            if (!brokersToRemove.isEmpty()) {
                detectedSlowBrokers.add(this.createSlowBrokersAnomaly(brokersToRemove, this._slowBrokerRemovalEnabled, true, this.getSlowBrokerDescription(brokersToRemove)));
            }
        }
        return detectedSlowBrokers;
    }

    public void configure(Map<String, ?> configs) {
        this._kafkaCruiseControl = (KafkaCruiseControl)configs.get("kafka.cruise.control.object");
        if (this._kafkaCruiseControl == null) {
            throw new IllegalArgumentException("Slow broker detector is missing kafka.cruise.control.object");
        }
        Map originalConfig = this._kafkaCruiseControl.config().originals();
        this._slowBrokerRemovalEnabled = Boolean.parseBoolean((String)originalConfig.get(SELF_HEALING_SLOW_BROKER_REMOVAL_ENABLED_CONFIG));
        this._bytesInRateDetectionThreshold = AnomalyUtils.parseAndGetConfig((Map<String, Object>)originalConfig, SLOW_BROKER_BYTES_IN_RATE_DETECTION_THRESHOLD_CONFIG, 1024.0, val -> val < 0.0);
        this._logFlushTimeThresholdMs = AnomalyUtils.parseAndGetConfig((Map<String, Object>)originalConfig, SLOW_BROKER_LOG_FLUSH_TIME_THRESHOLD_MS_CONFIG, 150.0, val -> val < 0.0);
        this._metricHistoryPercentile = AnomalyUtils.parseAndGetConfig((Map<String, Object>)originalConfig, SLOW_BROKER_METRIC_HISTORY_PERCENTILE_THRESHOLD_CONFIG, 90.0, val -> val < 0.0 || val > 100.0);
        this._metricHistoryMargin = AnomalyUtils.parseAndGetConfig((Map<String, Object>)originalConfig, SLOW_BROKER_METRIC_HISTORY_MARGIN_CONFIG, 3.0, val -> val < 1.0);
        this._peerMetricPercentile = AnomalyUtils.parseAndGetConfig((Map<String, Object>)originalConfig, SLOW_BROKER_PEER_METRIC_PERCENTILE_THRESHOLD_CONFIG, 50.0, val -> val < 0.0 || val > 100.0);
        this._peerMetricMargin = AnomalyUtils.parseAndGetConfig((Map<String, Object>)originalConfig, SLOW_BROKER_PEER_METRIC_MARGIN_CONFIG, 3.0, val -> val < 1.0);
        this._slowBrokerDemotionScore = AnomalyUtils.parseAndGetConfig((Map<String, Object>)originalConfig, SLOW_BROKER_DEMOTION_SCORE_CONFIG, 5, val -> val < 0);
        this._slowBrokerDecommissionScore = AnomalyUtils.parseAndGetConfig((Map<String, Object>)originalConfig, SLOW_BROKER_DECOMMISSION_SCORE_CONFIG, 50, val -> val < 0);
        this._selfHealingUnfixableRatio = AnomalyUtils.parseAndGetConfig((Map<String, Object>)originalConfig, SLOW_BROKER_SELF_HEALING_UNFIXABLE_RATIO_CONFIG, 0.1, val -> val < 0.0 || val > 1.0);
    }
}

