package com.linkedin.kafka.cruisecontrol.monitor.sampling;

import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityConfigResolver;
import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityInfo;
import com.linkedin.kafka.cruisecontrol.exception.BrokerCapacityResolutionException;
import com.linkedin.kafka.cruisecontrol.metricsreporter.exception.UnknownVersionException;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.CruiseControlMetric;
import com.linkedin.kafka.cruisecontrol.monitor.MonitorUtils;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricSampler;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.BrokerLoad;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.BrokerMetricSample;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.PartitionMetricSample;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/monitor/sampling/CruiseControlMetricsProcessor.class */
public class CruiseControlMetricsProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(CruiseControlMetricsProcessor.class);
    private static final long INIT_METRIC_TIMESTAMP = -1;
    private final BrokerCapacityConfigResolver _brokerCapacityConfigResolver;
    private final boolean _allowCpuCapacityEstimation;
    private final Map<Integer, BrokerLoad> _brokerLoad = new HashMap();
    private final Map<Integer, Short> _cachedNumCoresByBroker = new HashMap();
    private long _maxMetricTimestamp = -1;

    /* JADX INFO: Access modifiers changed from: package-private */
    public CruiseControlMetricsProcessor(BrokerCapacityConfigResolver brokerCapacityConfigResolver, boolean z) {
        this._brokerCapacityConfigResolver = brokerCapacityConfigResolver;
        this._allowCpuCapacityEstimation = z;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addMetric(CruiseControlMetric cruiseControlMetric) {
        int brokerId = cruiseControlMetric.brokerId();
        LOG.trace("Adding cruise control metric {}", cruiseControlMetric);
        this._maxMetricTimestamp = Math.max(cruiseControlMetric.time(), this._maxMetricTimestamp);
        this._brokerLoad.compute(Integer.valueOf(brokerId), (num, brokerLoad) -> {
            BrokerLoad brokerLoad = brokerLoad == null ? new BrokerLoad() : brokerLoad;
            brokerLoad.recordMetric(cruiseControlMetric);
            return brokerLoad;
        });
    }

    private void updateCachedNumCoresByBroker(Cluster cluster) {
        Iterator<Integer> it = this._brokerLoad.keySet().iterator();
        while (it.hasNext()) {
            this._cachedNumCoresByBroker.computeIfAbsent(Integer.valueOf(it.next().intValue()), num -> {
                Node nodeById = cluster.nodeById(num.intValue());
                if (nodeById == null) {
                    LOG.warn("Received metrics from unrecognized broker {}.", num);
                    return null;
                }
                try {
                    BrokerCapacityInfo capacityForBroker = this._brokerCapacityConfigResolver.capacityForBroker(MonitorUtils.getRackHandleNull(nodeById), nodeById.host(), num.intValue(), 10000L, this._allowCpuCapacityEstimation);
                    if (capacityForBroker == null) {
                        return null;
                    }
                    return Short.valueOf(capacityForBroker.numCpuCores());
                } catch (BrokerCapacityResolutionException | TimeoutException e) {
                    LOG.warn("Unable to get number of CPU cores for broker {}.", Integer.valueOf(nodeById.id()), e);
                    return null;
                }
            });
        }
    }

    Map<Integer, Short> cachedNumCoresByBroker() {
        return this._cachedNumCoresByBroker;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MetricSampler.Samples process(Cluster cluster, Set<TopicPartition> set, MetricSampler.SamplingMode samplingMode) {
        updateCachedNumCoresByBroker(cluster);
        this._brokerLoad.forEach((num, brokerLoad) -> {
            brokerLoad.prepareBrokerMetrics(cluster, num.intValue(), this._maxMetricTimestamp);
        });
        Map<Integer, Integer> map = null;
        HashSet hashSet = new HashSet();
        if (samplingMode == MetricSampler.SamplingMode.ALL || samplingMode == MetricSampler.SamplingMode.PARTITION_METRICS_ONLY) {
            map = addPartitionMetricSamples(cluster, set, hashSet);
        }
        int i = 0;
        HashSet hashSet2 = new HashSet();
        if (samplingMode == MetricSampler.SamplingMode.ALL || samplingMode == MetricSampler.SamplingMode.BROKER_METRICS_ONLY) {
            i = addBrokerMetricSamples(cluster, hashSet2);
        }
        logProcess(samplingMode, map, i, hashSet, hashSet2);
        return new MetricSampler.Samples(hashSet, hashSet2);
    }

    private void logProcess(MetricSampler.SamplingMode samplingMode, Map<Integer, Integer> map, int i, Set<PartitionMetricSample> set, Set<BrokerMetricSample> set2) {
        switch (samplingMode) {
            case ALL:
                Logger logger = LOG;
                Object[] objArr = new Object[5];
                objArr[0] = Integer.valueOf(set.size());
                objArr[1] = !map.isEmpty() ? String.format("(%s skipped by broker %s)", Integer.valueOf(map.values().stream().mapToInt(num -> {
                    return num.intValue();
                }).sum()), map) : "";
                objArr[2] = Integer.valueOf(set2.size());
                objArr[3] = i > 0 ? "(" + i + " skipped)" : "";
                objArr[4] = Long.valueOf(this._maxMetricTimestamp);
                logger.info("Generated {}{} partition metric samples and {}{} broker metric samples for timestamp {}.", objArr);
                return;
            case PARTITION_METRICS_ONLY:
                Logger logger2 = LOG;
                Object[] objArr2 = new Object[3];
                objArr2[0] = Integer.valueOf(set.size());
                objArr2[1] = !map.isEmpty() ? String.format("(%s skipped by broker %s)", Integer.valueOf(map.values().stream().mapToInt(num2 -> {
                    return num2.intValue();
                }).sum()), map) : "";
                objArr2[2] = Long.valueOf(this._maxMetricTimestamp);
                logger2.info("Generated {}{} partition metric samples for timestamp {}.", objArr2);
                return;
            case BROKER_METRICS_ONLY:
                Logger logger3 = LOG;
                Object[] objArr3 = new Object[3];
                objArr3[0] = Integer.valueOf(set2.size());
                objArr3[1] = i > 0 ? "(" + i + " skipped)" : "";
                objArr3[2] = Long.valueOf(this._maxMetricTimestamp);
                logger3.info("Generated {}{} broker metric samples for timestamp {}.", objArr3);
                return;
            default:
                throw new IllegalStateException("Unknown sampling mode " + samplingMode);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void clear() {
        this._brokerLoad.clear();
        this._maxMetricTimestamp = -1L;
    }

    private Map<Integer, Integer> addPartitionMetricSamples(Cluster cluster, Set<TopicPartition> set, Set<PartitionMetricSample> set2) {
        HashMap hashMap = new HashMap();
        Map<Integer, Map<String, Integer>> leaderDistribution = SamplingUtils.leaderDistribution(cluster);
        for (TopicPartition topicPartition : set) {
            try {
                PartitionMetricSample buildPartitionMetricSample = SamplingUtils.buildPartitionMetricSample(cluster, leaderDistribution, topicPartition, this._brokerLoad, this._maxMetricTimestamp, this._cachedNumCoresByBroker, hashMap);
                if (buildPartitionMetricSample != null) {
                    LOG.trace("Added partition metrics sample for {}.", topicPartition);
                    set2.add(buildPartitionMetricSample);
                }
            } catch (Exception e) {
                LOG.error("Error building partition metric sample for {}.", topicPartition, e);
                hashMap.merge(-1, 1, (v0, v1) -> {
                    return Integer.sum(v0, v1);
                });
            }
        }
        return hashMap;
    }

    private int addBrokerMetricSamples(Cluster cluster, Set<BrokerMetricSample> set) {
        int i = 0;
        for (Node node : cluster.nodes()) {
            try {
                BrokerMetricSample buildBrokerMetricSample = SamplingUtils.buildBrokerMetricSample(node, this._brokerLoad, this._maxMetricTimestamp);
                if (buildBrokerMetricSample != null) {
                    LOG.trace("Added broker metric sample for broker {}.", Integer.valueOf(node.id()));
                    set.add(buildBrokerMetricSample);
                } else {
                    i++;
                }
            } catch (Exception e) {
                LOG.error("Error building broker metric sample for {}.", Integer.valueOf(node.id()), e);
                i++;
            } catch (UnknownVersionException e2) {
                LOG.error("Unrecognized serde version detected during broker metric sampling.", e2);
                i++;
            }
        }
        return i;
    }
}
