package com.linkedin.kafka.cruisecontrol.monitor.sampling;

import com.linkedin.cruisecontrol.metricdef.MetricDef;
import com.linkedin.kafka.cruisecontrol.common.Resource;
import com.linkedin.kafka.cruisecontrol.config.constants.MonitorConfig;
import com.linkedin.kafka.cruisecontrol.metricsreporter.exception.UnknownVersionException;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.CruiseControlMetric;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.MetricSerde;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.RawMetricType;
import com.linkedin.kafka.cruisecontrol.model.ModelUtils;
import com.linkedin.kafka.cruisecontrol.monitor.metricdefinition.KafkaMetricDef;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.BrokerLoad;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.BrokerMetricSample;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.PartitionMetricSample;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/monitor/sampling/SamplingUtils.class */
public class SamplingUtils {
    private static final String SKIP_BUILDING_SAMPLE_PREFIX = "Skip generating metric sample for ";
    public static final int UNRECOGNIZED_BROKER_ID = -1;
    private static final Logger LOG = LoggerFactory.getLogger(SamplingUtils.class);
    public static final Random RANDOM = new Random();

    private SamplingUtils() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Map<Integer, Map<String, Integer>> leaderDistribution(Cluster cluster) {
        List<Node> nodes = cluster.nodes();
        HashMap hashMap = new HashMap(nodes.size());
        for (Node node : nodes) {
            HashMap hashMap2 = new HashMap();
            hashMap.put(Integer.valueOf(node.id()), hashMap2);
            cluster.partitionsForNode(node.id()).forEach(partitionInfo -> {
                hashMap2.merge(partitionInfo.topic(), 1, (v0, v1) -> {
                    return Integer.sum(v0, v1);
                });
            });
        }
        return hashMap;
    }

    private static Double estimateLeaderCpuUtil(PartitionMetricSample partitionMetricSample, BrokerLoad brokerLoad, MetricDef metricDef, short s) {
        Double estimateLeaderCpuUtilPerCore = ModelUtils.estimateLeaderCpuUtilPerCore(brokerLoad.brokerMetric(RawMetricType.BROKER_CPU_UTIL), brokerLoad.brokerMetric(RawMetricType.ALL_TOPIC_BYTES_IN), brokerLoad.brokerMetric(RawMetricType.ALL_TOPIC_BYTES_OUT) + brokerLoad.brokerMetric(RawMetricType.ALL_TOPIC_REPLICATION_BYTES_OUT), brokerLoad.brokerMetric(RawMetricType.ALL_TOPIC_REPLICATION_BYTES_IN), partitionMetricSample.metricValue(metricDef.metricInfo(KafkaMetricDef.LEADER_BYTES_IN.name()).id()).doubleValue(), partitionMetricSample.metricValue(metricDef.metricInfo(KafkaMetricDef.LEADER_BYTES_OUT.name()).id()).doubleValue() + partitionMetricSample.metricValue(metricDef.metricInfo(KafkaMetricDef.REPLICATION_BYTES_OUT_RATE.name()).id()).doubleValue());
        if (estimateLeaderCpuUtilPerCore != null) {
            return Double.valueOf(s * estimateLeaderCpuUtilPerCore.doubleValue());
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static double estimateLeaderCpuUtilUsingLinearRegressionModel(PartitionMetricSample partitionMetricSample) {
        List<Short> resourceToMetricIds = KafkaMetricDef.resourceToMetricIds(Resource.CPU);
        List<Short> resourceToMetricIds2 = KafkaMetricDef.resourceToMetricIds(Resource.NW_OUT);
        return ModelUtils.estimateLeaderCpuUtilUsingLinearRegressionModel(sumOfMetrics(partitionMetricSample, resourceToMetricIds).doubleValue(), sumOfMetrics(partitionMetricSample, resourceToMetricIds2).doubleValue());
    }

    private static Double sumOfMetrics(PartitionMetricSample partitionMetricSample, Collection<Short> collection) {
        double d = 0.0d;
        Iterator<Short> it = collection.iterator();
        while (it.hasNext()) {
            d += partitionMetricSample.metricValue(it.next().shortValue()).doubleValue();
        }
        return Double.valueOf(d);
    }

    private static TopicPartition partitionHandleDotInTopicName(TopicPartition topicPartition) {
        return !topicPartition.topic().contains(".") ? topicPartition : new TopicPartition(replaceDotsWithUnderscores(topicPartition.topic()), topicPartition.partition());
    }

    public static String replaceDotsWithUnderscores(String str) {
        return !str.contains(".") ? str : str.replace('.', '_');
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static PartitionMetricSample buildPartitionMetricSample(Cluster cluster, Map<Integer, Map<String, Integer>> map, TopicPartition topicPartition, Map<Integer, BrokerLoad> map2, long j, Map<Integer, Short> map3, Map<Integer, Integer> map4) {
        Node leaderFor = cluster.leaderFor(topicPartition);
        if (leaderFor == null) {
            LOG.trace("Partition {} has no current leader.", topicPartition);
            map4.merge(-1, 1, (v0, v1) -> {
                return Integer.sum(v0, v1);
            });
            return null;
        }
        int id = leaderFor.id();
        BrokerLoad brokerLoad = map2.get(Integer.valueOf(id));
        TopicPartition partitionHandleDotInTopicName = partitionHandleDotInTopicName(topicPartition);
        if (skipBuildingPartitionMetricSample(topicPartition, partitionHandleDotInTopicName, id, brokerLoad, map3)) {
            map4.merge(Integer.valueOf(id), 1, (v0, v1) -> {
                return Integer.sum(v0, v1);
            });
            return null;
        }
        MetricDef commonMetricDef = KafkaMetricDef.commonMetricDef();
        PartitionMetricSample partitionMetricSample = new PartitionMetricSample(id, topicPartition);
        int intValue = map.get(Integer.valueOf(id)).get(topicPartition.topic()).intValue();
        for (RawMetricType rawMetricType : RawMetricType.topicMetricTypes()) {
            partitionMetricSample.record(commonMetricDef.metricInfo(KafkaMetricDef.forRawMetricType(rawMetricType).name()), intValue == 0 ? 0.0d : brokerLoad.topicMetrics(partitionHandleDotInTopicName.topic(), rawMetricType) / intValue);
        }
        Double partitionMetric = brokerLoad.partitionMetric(partitionHandleDotInTopicName.topic(), partitionHandleDotInTopicName.partition(), RawMetricType.PARTITION_SIZE);
        if (partitionMetric == null) {
            map4.merge(Integer.valueOf(id), 1, (v0, v1) -> {
                return Integer.sum(v0, v1);
            });
            return null;
        }
        partitionMetricSample.record(commonMetricDef.metricInfo(KafkaMetricDef.DISK_USAGE.name()), partitionMetric.doubleValue());
        Double estimateLeaderCpuUtil = estimateLeaderCpuUtil(partitionMetricSample, brokerLoad, commonMetricDef, map3.get(Integer.valueOf(id)).shortValue());
        if (estimateLeaderCpuUtil == null) {
            map4.merge(Integer.valueOf(id), 1, (v0, v1) -> {
                return Integer.sum(v0, v1);
            });
            return null;
        }
        partitionMetricSample.record(commonMetricDef.metricInfo(KafkaMetricDef.CPU_USAGE.name()), estimateLeaderCpuUtil.doubleValue());
        partitionMetricSample.close(j);
        return partitionMetricSample;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static BrokerMetricSample buildBrokerMetricSample(Node node, Map<Integer, BrokerLoad> map, long j) throws UnknownVersionException {
        BrokerLoad brokerLoad = map.get(Integer.valueOf(node.id()));
        if (skipBuildingBrokerMetricSample(brokerLoad, node.id())) {
            return null;
        }
        MetricDef brokerMetricDef = KafkaMetricDef.brokerMetricDef();
        BrokerMetricSample brokerMetricSample = new BrokerMetricSample(node.host(), Integer.valueOf(node.id()), brokerLoad.brokerSampleDeserializationVersion());
        for (Map.Entry entry : RawMetricType.brokerMetricTypesDiffByVersion().entrySet()) {
            for (RawMetricType rawMetricType : (Set) entry.getValue()) {
                if (!brokerLoad.brokerMetricAvailable(rawMetricType)) {
                    LOG.warn("{}broker {} because it does not have {} metrics (serde version {}) or the metrics are inconsistent.", new Object[]{SKIP_BUILDING_SAMPLE_PREFIX, Integer.valueOf(node.id()), rawMetricType, entry.getKey()});
                    return null;
                }
                brokerMetricSample.record(brokerMetricDef.metricInfo(KafkaMetricDef.forRawMetricType(rawMetricType).name()), brokerLoad.brokerMetric(rawMetricType));
            }
        }
        brokerMetricSample.record(brokerMetricDef.metricInfo(KafkaMetricDef.DISK_USAGE.name()), brokerLoad.diskUsage());
        brokerMetricSample.close(j);
        return brokerMetricSample;
    }

    private static boolean skipBuildingPartitionMetricSample(TopicPartition topicPartition, TopicPartition topicPartition2, int i, BrokerLoad brokerLoad, Map<Integer, Short> map) {
        if (brokerLoad == null || !brokerLoad.brokerMetricAvailable(RawMetricType.BROKER_CPU_UTIL)) {
            LOG.debug("{}partition {} because {} metric for broker {} is unavailable.", new Object[]{SKIP_BUILDING_SAMPLE_PREFIX, topicPartition, RawMetricType.BROKER_CPU_UTIL, Integer.valueOf(i)});
            return true;
        }
        if (map.get(Integer.valueOf(i)) == null) {
            LOG.debug("{}partition {} because the number of CPU cores of its leader broker {} is unavailable. Please ensure that either the broker capacity config resolver provides the number of CPU cores without estimation or allow CPU capacity estimation during sampling (i.e. set {} to true).", new Object[]{SKIP_BUILDING_SAMPLE_PREFIX, topicPartition, Integer.valueOf(i), MonitorConfig.SAMPLING_ALLOW_CPU_CAPACITY_ESTIMATION_CONFIG});
            return true;
        }
        if (brokerLoad.allDotHandledTopicMetricsAvailable(topicPartition2.topic())) {
            return false;
        }
        LOG.debug("{}partition {} because broker {} has no metric or topic metrics are not available", new Object[]{SKIP_BUILDING_SAMPLE_PREFIX, topicPartition, Integer.valueOf(i)});
        return true;
    }

    private static boolean skipBuildingBrokerMetricSample(BrokerLoad brokerLoad, int i) {
        if (brokerLoad == null) {
            LOG.warn("{}broker {} because all broker metrics are missing.", SKIP_BUILDING_SAMPLE_PREFIX, Integer.valueOf(i));
            return true;
        }
        if (brokerLoad.minRequiredBrokerMetricsAvailable()) {
            return false;
        }
        if (brokerLoad.missingBrokerMetricsInMinSupportedVersion().size() == 0) {
            LOG.warn("{}broker {} because there are not enough topic metrics to generate broker metrics.", SKIP_BUILDING_SAMPLE_PREFIX, Integer.valueOf(i));
            return true;
        }
        LOG.warn("{}broker {} because the following required metrics are missing {}.", new Object[]{SKIP_BUILDING_SAMPLE_PREFIX, Integer.valueOf(i), brokerLoad.missingBrokerMetricsInMinSupportedVersion()});
        return true;
    }

    public static Consumer<String, CruiseControlMetric> createMetricConsumer(Map<String, ?> map, String str) {
        String str2 = (String) map.get(CruiseControlMetricsReporterSampler.METRIC_REPORTER_SAMPLER_BOOTSTRAP_SERVERS);
        if (str2 == null) {
            str2 = bootstrapServers(map);
        }
        long nextLong = RANDOM.nextLong();
        Properties properties = new Properties();
        properties.putAll(map);
        properties.setProperty(MonitorConfig.BOOTSTRAP_SERVERS_CONFIG, str2);
        properties.setProperty(MonitorConfig.CLIENT_ID_CONFIG, str + "-consumer-" + nextLong);
        properties.setProperty("auto.offset.reset", "latest");
        properties.setProperty("enable.auto.commit", "false");
        properties.setProperty("max.poll.records", Integer.toString(Integer.MAX_VALUE));
        properties.setProperty("key.deserializer", StringDeserializer.class.getName());
        properties.setProperty("value.deserializer", MetricSerde.class.getName());
        properties.setProperty(MonitorConfig.RECONNECT_BACKOFF_MS_CONFIG, map.get(MonitorConfig.RECONNECT_BACKOFF_MS_CONFIG).toString());
        return new KafkaConsumer(properties);
    }

    public static String bootstrapServers(Map<String, ?> map) {
        return String.join(",", (List) map.get(MonitorConfig.BOOTSTRAP_SERVERS_CONFIG));
    }

    public static KafkaConsumer<byte[], byte[]> createSampleStoreConsumer(Map<String, ?> map, String str) {
        long nextLong = RANDOM.nextLong();
        Properties properties = new Properties();
        properties.putAll(map);
        properties.setProperty(MonitorConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers(map));
        properties.setProperty(MonitorConfig.CLIENT_ID_CONFIG, str + "-consumer-" + nextLong);
        properties.setProperty("auto.offset.reset", "earliest");
        properties.setProperty("enable.auto.commit", "false");
        properties.setProperty("max.poll.records", Integer.toString(Integer.MAX_VALUE));
        properties.setProperty("key.deserializer", ByteArrayDeserializer.class.getName());
        properties.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
        properties.setProperty(MonitorConfig.RECONNECT_BACKOFF_MS_CONFIG, map.get(MonitorConfig.RECONNECT_BACKOFF_MS_CONFIG).toString());
        return new KafkaConsumer<>(properties);
    }
}
