/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.monitor.sampling.prometheus;

import com.linkedin.cruisecontrol.common.config.ConfigDef;
import com.linkedin.cruisecontrol.common.config.ConfigException;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfigUtils;
import com.linkedin.kafka.cruisecontrol.exception.SamplingException;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.BrokerMetric;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.CruiseControlMetric;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.PartitionMetric;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.RawMetricType;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.TopicMetric;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.AbstractMetricSampler;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricSamplerOptions;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.prometheus.DefaultPrometheusQuerySupplier;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.prometheus.InvalidPrometheusResultException;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.prometheus.PrometheusAdapter;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.prometheus.PrometheusQuerySupplier;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.prometheus.model.PrometheusQueryResult;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.prometheus.model.PrometheusValue;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.http.HttpHost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PrometheusMetricSampler
extends AbstractMetricSampler {
    static final String PROMETHEUS_SERVER_ENDPOINT_CONFIG = "prometheus.server.endpoint";
    static final String PROMETHEUS_QUERY_RESOLUTION_STEP_MS_CONFIG = "prometheus.query.resolution.step.ms";
    private static final Integer DEFAULT_PROMETHEUS_QUERY_RESOLUTION_STEP_MS = 60000;
    static final String PROMETHEUS_QUERY_SUPPLIER_CONFIG = "prometheus.query.supplier";
    private static final Class<?> DEFAULT_PROMETHEUS_QUERY_SUPPLIER = DefaultPrometheusQuerySupplier.class;
    private static final Logger LOG = LoggerFactory.getLogger(PrometheusMetricSampler.class);
    protected int _samplingIntervalMs;
    protected Map<String, Integer> _hostToBrokerIdMap = new HashMap<String, Integer>();
    protected PrometheusAdapter _prometheusAdapter;
    protected Map<RawMetricType, String> _metricToPrometheusQueryMap;
    private CloseableHttpClient _httpClient;

    @Override
    public void configure(Map<String, ?> configs) {
        super.configure(configs);
        this.configureSamplingInterval(configs);
        this.configurePrometheusAdapter(configs);
        this.configureQueryMap(configs);
    }

    private void configureSamplingInterval(Map<String, ?> configs) {
        this._samplingIntervalMs = DEFAULT_PROMETHEUS_QUERY_RESOLUTION_STEP_MS;
        if (configs.containsKey(PROMETHEUS_QUERY_RESOLUTION_STEP_MS_CONFIG)) {
            String samplingIntervalMsString = (String)configs.get(PROMETHEUS_QUERY_RESOLUTION_STEP_MS_CONFIG);
            try {
                this._samplingIntervalMs = Integer.parseInt(samplingIntervalMsString);
            }
            catch (NumberFormatException e) {
                throw new ConfigException("%s config should be a positive number, provided %s", (Object)samplingIntervalMsString);
            }
            if (this._samplingIntervalMs <= 0) {
                throw new ConfigException(String.format("%s config should be set to positive, provided %d.", PROMETHEUS_QUERY_RESOLUTION_STEP_MS_CONFIG, this._samplingIntervalMs));
            }
        }
    }

    private void configurePrometheusAdapter(Map<String, ?> configs) {
        String endpoint = (String)configs.get(PROMETHEUS_SERVER_ENDPOINT_CONFIG);
        if (endpoint == null) {
            throw new ConfigException(String.format("%s config is required by Prometheus metric sampler", PROMETHEUS_SERVER_ENDPOINT_CONFIG));
        }
        try {
            HttpHost host = HttpHost.create((String)endpoint);
            if (host.getPort() < 0) {
                throw new IllegalArgumentException();
            }
            this._httpClient = HttpClients.createDefault();
            this._prometheusAdapter = new PrometheusAdapter(this._httpClient, host, this._samplingIntervalMs);
        }
        catch (IllegalArgumentException ex) {
            throw new ConfigException(String.format("Prometheus endpoint URI is malformed, expected schema://host:port, provided %s", endpoint));
        }
    }

    private void configureQueryMap(Map<String, ?> configs) {
        String prometheusQuerySupplierClassName = (String)configs.get(PROMETHEUS_QUERY_SUPPLIER_CONFIG);
        Class prometheusQuerySupplierClass = DEFAULT_PROMETHEUS_QUERY_SUPPLIER;
        if (prometheusQuerySupplierClassName != null && !PrometheusQuerySupplier.class.isAssignableFrom(prometheusQuerySupplierClass = (Class)ConfigDef.parseType((String)PROMETHEUS_QUERY_SUPPLIER_CONFIG, (Object)prometheusQuerySupplierClassName, (ConfigDef.Type)ConfigDef.Type.CLASS))) {
            throw new ConfigException(String.format("Invalid %s is provided to prometheus metric sampler, provided %s", PROMETHEUS_QUERY_SUPPLIER_CONFIG, prometheusQuerySupplierClass));
        }
        PrometheusQuerySupplier prometheusQuerySupplier = KafkaCruiseControlConfigUtils.getConfiguredInstance(prometheusQuerySupplierClass, PrometheusQuerySupplier.class, Collections.emptyMap());
        this._metricToPrometheusQueryMap = (Map)prometheusQuerySupplier.get();
    }

    @Override
    public void close() throws IOException {
        this._httpClient.close();
    }

    private Integer getBrokerIdForHostName(String host, Cluster cluster) {
        Integer cachedId = this._hostToBrokerIdMap.get(host);
        if (cachedId != null) {
            return cachedId;
        }
        this.mapNodesToClusterId(cluster);
        return this._hostToBrokerIdMap.get(host);
    }

    private void mapNodesToClusterId(Cluster cluster) {
        for (Node node : cluster.nodes()) {
            this._hostToBrokerIdMap.put(node.host(), node.id());
        }
    }

    @Override
    protected int retrieveMetricsForProcessing(MetricSamplerOptions metricSamplerOptions) throws SamplingException {
        int metricsAdded = 0;
        int resultsSkipped = 0;
        for (Map.Entry<RawMetricType, String> metricToQueryEntry : this._metricToPrometheusQueryMap.entrySet()) {
            List<PrometheusQueryResult> prometheusQueryResults;
            RawMetricType metricType = metricToQueryEntry.getKey();
            String prometheusQuery = metricToQueryEntry.getValue();
            try {
                prometheusQueryResults = this._prometheusAdapter.queryMetric(prometheusQuery, metricSamplerOptions.startTimeMs(), metricSamplerOptions.endTimeMs());
            }
            catch (IOException e) {
                LOG.error("Error when attempting to query Prometheus metrics", (Throwable)e);
                throw new SamplingException("Could not query metrics from Prometheus");
            }
            for (PrometheusQueryResult result : prometheusQueryResults) {
                try {
                    switch (metricType.metricScope()) {
                        case BROKER: {
                            metricsAdded += this.addBrokerMetrics(metricSamplerOptions.cluster(), metricType, result);
                            break;
                        }
                        case TOPIC: {
                            metricsAdded += this.addTopicMetrics(metricSamplerOptions.cluster(), metricType, result);
                            break;
                        }
                        case PARTITION: {
                            metricsAdded += this.addPartitionMetrics(metricSamplerOptions.cluster(), metricType, result);
                            break;
                        }
                    }
                }
                catch (InvalidPrometheusResultException e) {
                    LOG.trace("Invalid query result received from Prometheus for query {}", (Object)prometheusQuery, (Object)e);
                    ++resultsSkipped;
                }
            }
        }
        LOG.info("Added {} metric values. Skipped {} invalid query results.", (Object)metricsAdded, (Object)resultsSkipped);
        return metricsAdded;
    }

    private int addBrokerMetrics(Cluster cluster, RawMetricType metricType, PrometheusQueryResult queryResult) throws InvalidPrometheusResultException {
        int brokerId = this.getBrokerId(cluster, queryResult);
        int metricsAdded = 0;
        for (PrometheusValue value : queryResult.values()) {
            this.addMetricForProcessing((CruiseControlMetric)new BrokerMetric(metricType, value.epochSeconds() * 1000L, brokerId, value.value()));
            ++metricsAdded;
        }
        return metricsAdded;
    }

    private int addTopicMetrics(Cluster cluster, RawMetricType metricType, PrometheusQueryResult queryResult) throws InvalidPrometheusResultException {
        int brokerId = this.getBrokerId(cluster, queryResult);
        String topic = this.getTopic(queryResult);
        int metricsAdded = 0;
        for (PrometheusValue value : queryResult.values()) {
            this.addMetricForProcessing((CruiseControlMetric)new TopicMetric(metricType, value.epochSeconds() * 1000L, brokerId, topic, value.value()));
            ++metricsAdded;
        }
        return metricsAdded;
    }

    private int addPartitionMetrics(Cluster cluster, RawMetricType metricType, PrometheusQueryResult queryResult) throws InvalidPrometheusResultException {
        int brokerId = this.getBrokerId(cluster, queryResult);
        String topic = this.getTopic(queryResult);
        int partition = this.getPartition(queryResult);
        int metricsAdded = 0;
        for (PrometheusValue value : queryResult.values()) {
            this.addMetricForProcessing((CruiseControlMetric)new PartitionMetric(metricType, value.epochSeconds() * 1000L, brokerId, topic, partition, value.value()));
            ++metricsAdded;
        }
        return metricsAdded;
    }

    private int getBrokerId(Cluster cluster, PrometheusQueryResult queryResult) throws InvalidPrometheusResultException {
        String hostPort = queryResult.metric().instance();
        if (hostPort == null) {
            throw new InvalidPrometheusResultException("Instance returned as part of Prometheus API response is null.");
        }
        String hostName = hostPort.split(":")[0];
        Integer brokerId = this.getBrokerIdForHostName(hostName, cluster);
        if (brokerId == null) {
            throw new InvalidPrometheusResultException(String.format("Unexpected host %s, does not map to any of broker found from Kafka cluster metadata. Brokers found in Kafka cluster metadata = %s", hostName, this._hostToBrokerIdMap.keySet()));
        }
        return brokerId;
    }

    private String getTopic(PrometheusQueryResult queryResult) throws InvalidPrometheusResultException {
        String topic = queryResult.metric().topic();
        if (topic == null) {
            throw new InvalidPrometheusResultException("Topic was not returned as part of Prometheus API response.");
        }
        return topic;
    }

    private int getPartition(PrometheusQueryResult queryResult) throws InvalidPrometheusResultException {
        String partitionString = queryResult.metric().partition();
        if (partitionString == null) {
            throw new InvalidPrometheusResultException("Partition was not returned as part of Prometheus API response.");
        }
        try {
            return Integer.parseInt(partitionString);
        }
        catch (NumberFormatException e) {
            throw new InvalidPrometheusResultException("Partition returned as part of Prometheus API response was not a number.");
        }
    }
}

