package com.linkedin.kafka.cruisecontrol.monitor.sampling.prometheus;

import com.linkedin.cruisecontrol.common.config.ConfigDef;
import com.linkedin.cruisecontrol.common.config.ConfigException;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfigUtils;
import com.linkedin.kafka.cruisecontrol.exception.SamplingException;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.BrokerMetric;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.PartitionMetric;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.RawMetricType;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.TopicMetric;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.AbstractMetricSampler;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricSamplerOptions;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.prometheus.model.PrometheusQueryResult;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.prometheus.model.PrometheusValue;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.http.HttpHost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/monitor/sampling/prometheus/PrometheusMetricSampler.class */
public class PrometheusMetricSampler extends AbstractMetricSampler {
    static final String PROMETHEUS_SERVER_ENDPOINT_CONFIG = "prometheus.server.endpoint";
    static final String PROMETHEUS_QUERY_RESOLUTION_STEP_MS_CONFIG = "prometheus.query.resolution.step.ms";
    static final String PROMETHEUS_QUERY_SUPPLIER_CONFIG = "prometheus.query.supplier";
    protected int _samplingIntervalMs;
    protected Map<String, Integer> _hostToBrokerIdMap = new HashMap();
    protected PrometheusAdapter _prometheusAdapter;
    protected Map<RawMetricType, String> _metricToPrometheusQueryMap;
    private CloseableHttpClient _httpClient;
    private static final Integer DEFAULT_PROMETHEUS_QUERY_RESOLUTION_STEP_MS = 60000;
    private static final Class<?> DEFAULT_PROMETHEUS_QUERY_SUPPLIER = DefaultPrometheusQuerySupplier.class;
    private static final Logger LOG = LoggerFactory.getLogger(PrometheusMetricSampler.class);

    /* renamed from: com.linkedin.kafka.cruisecontrol.monitor.sampling.prometheus.PrometheusMetricSampler$1, reason: invalid class name */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/monitor/sampling/prometheus/PrometheusMetricSampler$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$linkedin$kafka$cruisecontrol$metricsreporter$metric$RawMetricType$MetricScope = new int[RawMetricType.MetricScope.values().length];

        static {
            try {
                $SwitchMap$com$linkedin$kafka$cruisecontrol$metricsreporter$metric$RawMetricType$MetricScope[RawMetricType.MetricScope.BROKER.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$linkedin$kafka$cruisecontrol$metricsreporter$metric$RawMetricType$MetricScope[RawMetricType.MetricScope.TOPIC.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$linkedin$kafka$cruisecontrol$metricsreporter$metric$RawMetricType$MetricScope[RawMetricType.MetricScope.PARTITION.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    @Override // com.linkedin.kafka.cruisecontrol.monitor.sampling.AbstractMetricSampler
    public void configure(Map<String, ?> map) {
        super.configure(map);
        configureSamplingInterval(map);
        configurePrometheusAdapter(map);
        configureQueryMap(map);
    }

    private void configureSamplingInterval(Map<String, ?> map) {
        this._samplingIntervalMs = DEFAULT_PROMETHEUS_QUERY_RESOLUTION_STEP_MS.intValue();
        if (map.containsKey(PROMETHEUS_QUERY_RESOLUTION_STEP_MS_CONFIG)) {
            String str = (String) map.get(PROMETHEUS_QUERY_RESOLUTION_STEP_MS_CONFIG);
            try {
                this._samplingIntervalMs = Integer.parseInt(str);
                if (this._samplingIntervalMs <= 0) {
                    throw new ConfigException(String.format("%s config should be set to positive, provided %d.", PROMETHEUS_QUERY_RESOLUTION_STEP_MS_CONFIG, Integer.valueOf(this._samplingIntervalMs)));
                }
            } catch (NumberFormatException e) {
                throw new ConfigException("%s config should be a positive number, provided %s", str);
            }
        }
    }

    private void configurePrometheusAdapter(Map<String, ?> map) {
        String str = (String) map.get(PROMETHEUS_SERVER_ENDPOINT_CONFIG);
        if (str == null) {
            throw new ConfigException(String.format("%s config is required by Prometheus metric sampler", PROMETHEUS_SERVER_ENDPOINT_CONFIG));
        }
        try {
            HttpHost create = HttpHost.create(str);
            if (create.getPort() < 0) {
                throw new IllegalArgumentException();
            }
            this._httpClient = HttpClients.createDefault();
            this._prometheusAdapter = new PrometheusAdapter(this._httpClient, create, this._samplingIntervalMs);
        } catch (IllegalArgumentException e) {
            throw new ConfigException(String.format("Prometheus endpoint URI is malformed, expected schema://host:port, provided %s", str));
        }
    }

    private void configureQueryMap(Map<String, ?> map) {
        String str = (String) map.get(PROMETHEUS_QUERY_SUPPLIER_CONFIG);
        Class<?> cls = DEFAULT_PROMETHEUS_QUERY_SUPPLIER;
        if (str != null) {
            cls = (Class) ConfigDef.parseType(PROMETHEUS_QUERY_SUPPLIER_CONFIG, str, ConfigDef.Type.CLASS);
            if (!PrometheusQuerySupplier.class.isAssignableFrom(cls)) {
                throw new ConfigException(String.format("Invalid %s is provided to prometheus metric sampler, provided %s", PROMETHEUS_QUERY_SUPPLIER_CONFIG, cls));
            }
        }
        this._metricToPrometheusQueryMap = ((PrometheusQuerySupplier) KafkaCruiseControlConfigUtils.getConfiguredInstance(cls, PrometheusQuerySupplier.class, Collections.emptyMap())).get();
    }

    @Override // java.lang.AutoCloseable
    public void close() throws IOException {
        this._httpClient.close();
    }

    private Integer getBrokerIdForHostName(String str, Cluster cluster) {
        Integer num = this._hostToBrokerIdMap.get(str);
        if (num != null) {
            return num;
        }
        mapNodesToClusterId(cluster);
        return this._hostToBrokerIdMap.get(str);
    }

    private void mapNodesToClusterId(Cluster cluster) {
        for (Node node : cluster.nodes()) {
            this._hostToBrokerIdMap.put(node.host(), Integer.valueOf(node.id()));
        }
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:13:0x009b. Please report as an issue. */
    @Override // com.linkedin.kafka.cruisecontrol.monitor.sampling.AbstractMetricSampler
    protected int retrieveMetricsForProcessing(MetricSamplerOptions metricSamplerOptions) throws SamplingException {
        int i = 0;
        int i2 = 0;
        for (Map.Entry<RawMetricType, String> entry : this._metricToPrometheusQueryMap.entrySet()) {
            RawMetricType key = entry.getKey();
            String value = entry.getValue();
            try {
                for (PrometheusQueryResult prometheusQueryResult : this._prometheusAdapter.queryMetric(value, metricSamplerOptions.startTimeMs(), metricSamplerOptions.endTimeMs())) {
                    try {
                        switch (AnonymousClass1.$SwitchMap$com$linkedin$kafka$cruisecontrol$metricsreporter$metric$RawMetricType$MetricScope[key.metricScope().ordinal()]) {
                            case 1:
                                i += addBrokerMetrics(metricSamplerOptions.cluster(), key, prometheusQueryResult);
                                break;
                            case 2:
                                i += addTopicMetrics(metricSamplerOptions.cluster(), key, prometheusQueryResult);
                                break;
                            case 3:
                                i += addPartitionMetrics(metricSamplerOptions.cluster(), key, prometheusQueryResult);
                                break;
                        }
                    } catch (InvalidPrometheusResultException e) {
                        LOG.trace("Invalid query result received from Prometheus for query {}", value, e);
                        i2++;
                    }
                }
            } catch (IOException e2) {
                LOG.error("Error when attempting to query Prometheus metrics", e2);
                throw new SamplingException("Could not query metrics from Prometheus");
            }
        }
        LOG.info("Added {} metric values. Skipped {} invalid query results.", Integer.valueOf(i), Integer.valueOf(i2));
        return i;
    }

    private int addBrokerMetrics(Cluster cluster, RawMetricType rawMetricType, PrometheusQueryResult prometheusQueryResult) throws InvalidPrometheusResultException {
        int brokerId = getBrokerId(cluster, prometheusQueryResult);
        int i = 0;
        for (PrometheusValue prometheusValue : prometheusQueryResult.values()) {
            addMetricForProcessing(new BrokerMetric(rawMetricType, prometheusValue.epochSeconds() * 1000, brokerId, prometheusValue.value()));
            i++;
        }
        return i;
    }

    private int addTopicMetrics(Cluster cluster, RawMetricType rawMetricType, PrometheusQueryResult prometheusQueryResult) throws InvalidPrometheusResultException {
        int brokerId = getBrokerId(cluster, prometheusQueryResult);
        String topic = getTopic(prometheusQueryResult);
        int i = 0;
        for (PrometheusValue prometheusValue : prometheusQueryResult.values()) {
            addMetricForProcessing(new TopicMetric(rawMetricType, prometheusValue.epochSeconds() * 1000, brokerId, topic, prometheusValue.value()));
            i++;
        }
        return i;
    }

    private int addPartitionMetrics(Cluster cluster, RawMetricType rawMetricType, PrometheusQueryResult prometheusQueryResult) throws InvalidPrometheusResultException {
        int brokerId = getBrokerId(cluster, prometheusQueryResult);
        String topic = getTopic(prometheusQueryResult);
        int partition = getPartition(prometheusQueryResult);
        int i = 0;
        for (PrometheusValue prometheusValue : prometheusQueryResult.values()) {
            addMetricForProcessing(new PartitionMetric(rawMetricType, prometheusValue.epochSeconds() * 1000, brokerId, topic, partition, prometheusValue.value()));
            i++;
        }
        return i;
    }

    private int getBrokerId(Cluster cluster, PrometheusQueryResult prometheusQueryResult) throws InvalidPrometheusResultException {
        String instance = prometheusQueryResult.metric().instance();
        if (instance == null) {
            throw new InvalidPrometheusResultException("Instance returned as part of Prometheus API response is null.");
        }
        String str = instance.split(":")[0];
        Integer brokerIdForHostName = getBrokerIdForHostName(str, cluster);
        if (brokerIdForHostName == null) {
            throw new InvalidPrometheusResultException(String.format("Unexpected host %s, does not map to any of broker found from Kafka cluster metadata. Brokers found in Kafka cluster metadata = %s", str, this._hostToBrokerIdMap.keySet()));
        }
        return brokerIdForHostName.intValue();
    }

    private String getTopic(PrometheusQueryResult prometheusQueryResult) throws InvalidPrometheusResultException {
        String str = prometheusQueryResult.metric().topic();
        if (str == null) {
            throw new InvalidPrometheusResultException("Topic was not returned as part of Prometheus API response.");
        }
        return str;
    }

    private int getPartition(PrometheusQueryResult prometheusQueryResult) throws InvalidPrometheusResultException {
        String partition = prometheusQueryResult.metric().partition();
        if (partition == null) {
            throw new InvalidPrometheusResultException("Partition was not returned as part of Prometheus API response.");
        }
        try {
            return Integer.parseInt(partition);
        } catch (NumberFormatException e) {
            throw new InvalidPrometheusResultException("Partition returned as part of Prometheus API response was not a number.");
        }
    }
}
