package com.linkedin.kafka.cruisecontrol.metricsreporter;

import com.linkedin.kafka.cruisecontrol.metricsreporter.exception.CruiseControlMetricsReporterException;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.CruiseControlMetric;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.MetricSerde;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.MetricsUtils;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.TopicMetric;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.YammerMetricProcessor;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Metric;
import com.yammer.metrics.core.MetricsRegistry;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import kafka.log.LogConfig;
import kafka.metrics.KafkaYammerMetrics;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.ReassignmentInProgressException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.KafkaThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporter.class */
public class CruiseControlMetricsReporter implements MetricsReporter, Runnable {
    private YammerMetricProcessor _yammerMetricProcessor;
    private KafkaThread _metricsReporterRunner;
    private KafkaProducer<String, CruiseControlMetric> _producer;
    private String _cruiseControlMetricsTopic;
    private long _reportingIntervalMs;
    private int _brokerId;
    private NewTopic _metricsTopic;
    private AdminClient _adminClient;
    private long _metricsTopicAutoCreateTimeoutMs;
    private int _metricsTopicAutoCreateRetries;
    protected static final String CRUISE_CONTROL_METRICS_TOPIC_CLEAN_UP_POLICY = "delete";
    private boolean _kubernetesMode;
    private MetricsRegistry _metricsRegistry;
    private static final Logger LOG = LoggerFactory.getLogger(CruiseControlMetricsReporter.class);
    protected static final Duration PRODUCER_CLOSE_TIMEOUT = Duration.ofSeconds(5);
    private Map<MetricName, KafkaMetric> _interestedMetrics = new ConcurrentHashMap();
    private long _lastReportingTime = System.currentTimeMillis();
    private int _numMetricSendFailure = 0;
    private volatile boolean _shutdown = false;

    public void init(List<KafkaMetric> list) {
        Iterator<KafkaMetric> it = list.iterator();
        while (it.hasNext()) {
            addMetricIfInterested(it.next());
        }
        LOG.info("Added {} Kafka metrics for Cruise Control metrics during initialization.", Integer.valueOf(this._interestedMetrics.size()));
        this._metricsReporterRunner = new KafkaThread("CruiseControlMetricsReporterRunner", this, true);
        this._yammerMetricProcessor = new YammerMetricProcessor();
        this._metricsReporterRunner.start();
        this._metricsRegistry = metricsRegistry();
    }

    public void metricChange(KafkaMetric kafkaMetric) {
        addMetricIfInterested(kafkaMetric);
    }

    public void metricRemoval(KafkaMetric kafkaMetric) {
        this._interestedMetrics.remove(kafkaMetric.metricName());
    }

    public void close() {
        LOG.info("Closing Cruise Control metrics reporter.");
        this._shutdown = true;
        if (this._metricsReporterRunner != null) {
            this._metricsReporterRunner.interrupt();
        }
        if (this._producer != null) {
            this._producer.close(PRODUCER_CLOSE_TIMEOUT);
        }
    }

    public void configure(Map<String, ?> map) {
        Properties parseProducerConfigs = CruiseControlMetricsReporterConfig.parseProducerConfigs(map);
        if (!parseProducerConfigs.containsKey("bootstrap.servers")) {
            Object obj = map.get("port");
            String str = "localhost:" + (obj == null ? "9092" : String.valueOf(obj));
            parseProducerConfigs.put("bootstrap.servers", str);
            LOG.info("Using default value of {} for {}", str, CruiseControlMetricsReporterConfig.config("bootstrap.servers"));
        }
        if (!parseProducerConfigs.containsKey("security.protocol")) {
            parseProducerConfigs.put("security.protocol", "PLAINTEXT");
            LOG.info("Using default value of {} for {}", "PLAINTEXT", CruiseControlMetricsReporterConfig.config("security.protocol"));
        }
        CruiseControlMetricsReporterConfig cruiseControlMetricsReporterConfig = new CruiseControlMetricsReporterConfig(map, false);
        setIfAbsent(parseProducerConfigs, "client.id", cruiseControlMetricsReporterConfig.getString(CruiseControlMetricsReporterConfig.config("client.id")));
        setIfAbsent(parseProducerConfigs, "linger.ms", cruiseControlMetricsReporterConfig.getLong(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_LINGER_MS_CONFIG).toString());
        setIfAbsent(parseProducerConfigs, "batch.size", cruiseControlMetricsReporterConfig.getInt(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_BATCH_SIZE_CONFIG).toString());
        setIfAbsent(parseProducerConfigs, "retries", "5");
        setIfAbsent(parseProducerConfigs, "compression.type", "gzip");
        setIfAbsent(parseProducerConfigs, "key.serializer", StringSerializer.class.getName());
        setIfAbsent(parseProducerConfigs, "value.serializer", MetricSerde.class.getName());
        setIfAbsent(parseProducerConfigs, "acks", "all");
        this._producer = new KafkaProducer<>(parseProducerConfigs);
        this._brokerId = Integer.parseInt((String) map.get(KafkaConfig.BrokerIdProp()));
        this._cruiseControlMetricsTopic = cruiseControlMetricsReporterConfig.getString(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_CONFIG);
        this._reportingIntervalMs = cruiseControlMetricsReporterConfig.getLong(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG).longValue();
        this._kubernetesMode = cruiseControlMetricsReporterConfig.getBoolean(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_KUBERNETES_MODE_CONFIG).booleanValue();
        if (cruiseControlMetricsReporterConfig.getBoolean(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_CONFIG).booleanValue()) {
            try {
                this._metricsTopic = createMetricsTopicFromReporterConfig(cruiseControlMetricsReporterConfig);
                this._adminClient = CruiseControlMetricsUtils.createAdminClient(CruiseControlMetricsUtils.addSslConfigs(parseProducerConfigs, cruiseControlMetricsReporterConfig));
                this._metricsTopicAutoCreateTimeoutMs = cruiseControlMetricsReporterConfig.getLong(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_TIMEOUT_MS_CONFIG).longValue();
                this._metricsTopicAutoCreateRetries = cruiseControlMetricsReporterConfig.getInt(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_RETRIES_CONFIG).intValue();
            } catch (CruiseControlMetricsReporterException e) {
                LOG.warn("Cruise Control metrics topic auto creation was disabled", e);
            }
        }
    }

    private static MetricsRegistry metricsRegistry() {
        try {
            Class.forName("kafka.metrics.KafkaYammerMetrics");
            LOG.info("KafkaYammerMetrics found and will be used.");
            return KafkaYammerMetrics.defaultRegistry();
        } catch (ClassNotFoundException e) {
            LOG.info("KafkaYammerMetrics not found. Metrics will be used.");
            return Metrics.defaultRegistry();
        }
    }

    protected NewTopic createMetricsTopicFromReporterConfig(CruiseControlMetricsReporterConfig cruiseControlMetricsReporterConfig) throws CruiseControlMetricsReporterException {
        String string = cruiseControlMetricsReporterConfig.getString(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_CONFIG);
        Integer num = cruiseControlMetricsReporterConfig.getInt(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_NUM_PARTITIONS_CONFIG);
        Short sh = cruiseControlMetricsReporterConfig.getShort(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_REPLICATION_FACTOR_CONFIG);
        Short sh2 = cruiseControlMetricsReporterConfig.getShort(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_MIN_INSYNC_REPLICAS_CONFIG);
        if (sh.shortValue() <= 0 || num.intValue() <= 0) {
            throw new CruiseControlMetricsReporterException("The topic configuration must explicitly set the replication factor and the num partitions");
        }
        NewTopic newTopic = new NewTopic(string, num.intValue(), sh.shortValue());
        HashMap hashMap = new HashMap(sh2.shortValue() > 0 ? 3 : 2);
        hashMap.put(LogConfig.RetentionMsProp(), Long.toString(cruiseControlMetricsReporterConfig.getLong(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_RETENTION_MS_CONFIG).longValue()));
        hashMap.put(LogConfig.CleanupPolicyProp(), CRUISE_CONTROL_METRICS_TOPIC_CLEAN_UP_POLICY);
        if (sh2.shortValue() > 0) {
            if (sh.shortValue() < sh2.shortValue()) {
                throw new CruiseControlMetricsReporterException(String.format("The configured topic replication factor (%d) must be greater than or equal tothe configured topic minimum insync replicas (%d)", sh, sh2));
            }
            hashMap.put("min.insync.replicas", String.valueOf(sh2));
        }
        newTopic.configs(hashMap);
        return newTopic;
    }

    protected void createCruiseControlMetricsTopic() throws TopicExistsException {
        CruiseControlMetricsUtils.retry(() -> {
            try {
                ((KafkaFuture) this._adminClient.createTopics(Collections.singletonList(this._metricsTopic)).values().get(this._metricsTopic.name())).get(this._metricsTopicAutoCreateTimeoutMs, TimeUnit.MILLISECONDS);
                LOG.info("Cruise Control metrics topic {} is created.", this._metricsTopic.name());
                return false;
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                if (e.getCause() instanceof TopicExistsException) {
                    throw ((TopicExistsException) e.getCause());
                }
                LOG.warn("Unable to create Cruise Control metrics topic {}.", this._metricsTopic.name(), e);
                return true;
            }
        }, this._metricsTopicAutoCreateRetries);
    }

    protected void maybeUpdateCruiseControlMetricsTopic() {
        maybeUpdateTopicConfig();
        maybeIncreaseTopicPartitionCount();
    }

    protected void maybeUpdateTopicConfig() {
        try {
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, this._cruiseControlMetricsTopic);
            Config config = (Config) ((KafkaFuture) this._adminClient.describeConfigs(Collections.singleton(configResource)).values().get(configResource)).get(10000L, TimeUnit.MILLISECONDS);
            HashSet hashSet = new HashSet(2);
            HashMap hashMap = new HashMap(2);
            hashMap.put(LogConfig.RetentionMsProp(), (String) this._metricsTopic.configs().get(LogConfig.RetentionMsProp()));
            hashMap.put(LogConfig.CleanupPolicyProp(), (String) this._metricsTopic.configs().get(LogConfig.CleanupPolicyProp()));
            CruiseControlMetricsUtils.maybeUpdateConfig(hashSet, hashMap, config);
            if (!hashSet.isEmpty()) {
                ((KafkaFuture) this._adminClient.incrementalAlterConfigs(Collections.singletonMap(configResource, hashSet)).values().get(configResource)).get(10000L, TimeUnit.MILLISECONDS);
            }
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            LOG.warn("Unable to update config of Cruise Cruise Control metrics topic {}", this._cruiseControlMetricsTopic, e);
        }
    }

    protected void maybeIncreaseTopicPartitionCount() {
        String name = this._metricsTopic.name();
        try {
            if (((TopicDescription) ((KafkaFuture) this._adminClient.describeTopics(Collections.singletonList(name)).values().get(name)).get(10000L, TimeUnit.MILLISECONDS)).partitions().size() < this._metricsTopic.numPartitions()) {
                this._adminClient.createPartitions(Collections.singletonMap(name, NewPartitions.increaseTo(this._metricsTopic.numPartitions())));
            }
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            Logger logger = LOG;
            Object[] objArr = new Object[4];
            objArr[0] = Integer.valueOf(this._metricsTopic.numPartitions());
            objArr[1] = name;
            objArr[2] = e.getCause() instanceof ReassignmentInProgressException ? " due to ongoing reassignment" : "";
            objArr[3] = e;
            logger.warn("Partition count increase to {} for topic {} failed{}.", objArr);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        LOG.info("Starting Cruise Control metrics reporter with reporting interval of {} ms.", Long.valueOf(this._reportingIntervalMs));
        if (this._metricsTopic != null && this._adminClient != null) {
            try {
                createCruiseControlMetricsTopic();
            } catch (TopicExistsException e) {
                maybeUpdateCruiseControlMetricsTopic();
            } finally {
                CruiseControlMetricsUtils.closeAdminClientWithTimeout(this._adminClient);
            }
        }
        while (!this._shutdown) {
            try {
                long currentTimeMillis = System.currentTimeMillis();
                LOG.debug("Reporting metrics for time {}.", Long.valueOf(currentTimeMillis));
                try {
                    if (currentTimeMillis > this._lastReportingTime + this._reportingIntervalMs) {
                        this._numMetricSendFailure = 0;
                        this._lastReportingTime = currentTimeMillis;
                        reportYammerMetrics(currentTimeMillis);
                        reportKafkaMetrics(currentTimeMillis);
                        reportCpuUtils(currentTimeMillis);
                    }
                    try {
                        this._producer.flush();
                    } catch (InterruptException e2) {
                        if (!this._shutdown) {
                            throw e2;
                            break;
                        }
                        LOG.info("Cruise Control metric reporter is interrupted during flush due to shutdown request.");
                    }
                } catch (Exception e3) {
                    LOG.error("Got exception in Cruise Control metrics reporter", e3);
                }
                if (this._numMetricSendFailure > 0) {
                    LOG.warn("Failed to send {} metrics for time {}", Integer.valueOf(this._numMetricSendFailure), Long.valueOf(currentTimeMillis));
                }
                this._numMetricSendFailure = 0;
                long j = currentTimeMillis + this._reportingIntervalMs;
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Reporting finished for time {} in {} ms. Next reporting time {}", new Object[]{Long.valueOf(currentTimeMillis), Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Long.valueOf(j)});
                }
                while (!this._shutdown && currentTimeMillis < j) {
                    try {
                        Thread.sleep(j - currentTimeMillis);
                    } catch (InterruptedException e4) {
                    }
                    currentTimeMillis = System.currentTimeMillis();
                }
            } catch (Throwable th) {
                LOG.info("Cruise Control metrics reporter exited.");
                throw th;
            }
        }
        LOG.info("Cruise Control metrics reporter exited.");
    }

    public void sendCruiseControlMetric(final CruiseControlMetric cruiseControlMetric) {
        ProducerRecord producerRecord = new ProducerRecord(this._cruiseControlMetricsTopic, (Integer) null, Long.valueOf(cruiseControlMetric.time()), cruiseControlMetric.metricClassId() == CruiseControlMetric.MetricClassId.TOPIC_METRIC ? ((TopicMetric) cruiseControlMetric).topic() : Integer.toString(cruiseControlMetric.brokerId()), cruiseControlMetric);
        LOG.debug("Sending Cruise Control metric {}.", cruiseControlMetric);
        this._producer.send(producerRecord, new Callback() { // from class: com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter.1
            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                if (exc != null) {
                    CruiseControlMetricsReporter.LOG.warn("Failed to send Cruise Control metric {}", cruiseControlMetric);
                    CruiseControlMetricsReporter.this._numMetricSendFailure++;
                }
            }
        });
    }

    private void reportYammerMetrics(long j) throws Exception {
        LOG.debug("Reporting yammer metrics.");
        YammerMetricProcessor.Context context = new YammerMetricProcessor.Context(this, j, this._brokerId, this._reportingIntervalMs);
        for (Map.Entry entry : this._metricsRegistry.allMetrics().entrySet()) {
            LOG.trace("Processing yammer metric {}, scope = {}", entry.getKey(), ((com.yammer.metrics.core.MetricName) entry.getKey()).getScope());
            ((Metric) entry.getValue()).processWith(this._yammerMetricProcessor, (com.yammer.metrics.core.MetricName) entry.getKey(), context);
        }
        LOG.debug("Finished reporting yammer metrics.");
    }

    private void reportKafkaMetrics(long j) {
        LOG.debug("Reporting KafkaMetrics. {}", this._interestedMetrics.values());
        Iterator<KafkaMetric> it = this._interestedMetrics.values().iterator();
        while (it.hasNext()) {
            sendCruiseControlMetric(MetricsUtils.toCruiseControlMetric(it.next(), j, this._brokerId));
        }
        LOG.debug("Finished reporting KafkaMetrics.");
    }

    private void reportCpuUtils(long j) throws IOException {
        LOG.debug("Reporting CPU util.");
        sendCruiseControlMetric(MetricsUtils.getCpuMetric(j, this._brokerId, this._kubernetesMode));
        LOG.debug("Finished reporting CPU util.");
    }

    private void addMetricIfInterested(KafkaMetric kafkaMetric) {
        LOG.trace("Checking Kafka metric {}", kafkaMetric.metricName());
        if (MetricsUtils.isInterested(kafkaMetric.metricName())) {
            LOG.debug("Added new metric {} to Cruise Control metrics reporter.", kafkaMetric.metricName());
            this._interestedMetrics.put(kafkaMetric.metricName(), kafkaMetric);
        }
    }

    private void setIfAbsent(Properties properties, String str, String str2) {
        if (properties.containsKey(str)) {
            return;
        }
        properties.setProperty(str, str2);
    }
}
